Add docstring and reorder methods

This commit is contained in:
Jules 2025-05-04 11:38:10 +02:00
parent 760a13b19f
commit 717f987083
Signed by: jdejaegh
GPG key ID: 99D6D184CA66933A
6 changed files with 359 additions and 317 deletions

View file

@ -20,7 +20,6 @@ from .data import (AnimationFrameData, CurrentWeatherData, Forecast,
IrmKmiForecast, IrmKmiRadarForecast, RadarAnimationData,
WarningData)
from .pollen import PollenParser
from .utils import next_weekday
_LOGGER = logging.getLogger(__name__)
@ -37,7 +36,6 @@ class IrmKmiApiParametersError(IrmKmiApiError):
"""Exception to indicate a parameter error."""
class IrmKmiApiClient:
"""API client for IRM KMI weather data"""
COORD_DECIMALS = 6
@ -95,6 +93,20 @@ class IrmKmiApiClient:
r: bytes = await self._api_wrapper(base_url=url, params={} if params is None else params)
return r.decode()
def expire_cache(self) -> None:
"""
Expire items from the cache which have not been accessed since self._cache_max_age (default 2h).
Must be called regularly to clear the cache.
"""
now = time.time()
keys_to_delete = set()
for key, value in self._cache.items():
if now - value['timestamp'] > self._cache_max_age:
keys_to_delete.add(key)
for key in keys_to_delete:
del self._cache[key]
_LOGGER.info(f"Expired {len(keys_to_delete)} elements from API cache")
async def _api_wrapper(
self,
params: dict,
@ -151,20 +163,6 @@ class IrmKmiApiClient:
"""Get API key."""
return hashlib.md5(f"r9EnW374jkJ9acc;{method_name};{datetime.now().strftime('%d/%m/%Y')}".encode()).hexdigest()
def expire_cache(self) -> None:
"""
Expire items from the cache which have not been accessed since self._cache_max_age (default 2h).
Must be called regularly to clear the cache.
"""
now = time.time()
keys_to_delete = set()
for key, value in self._cache.items():
if now - value['timestamp'] > self._cache_max_age:
keys_to_delete.add(key)
for key in keys_to_delete:
del self._cache[key]
_LOGGER.info(f"Expired {len(keys_to_delete)} elements from API cache")
class IrmKmiApiClientHa(IrmKmiApiClient):
"""API client for IRM KMI weather data with additional methods to integrate easily with Home Assistant"""
@ -183,22 +181,6 @@ class IrmKmiApiClientHa(IrmKmiApiClient):
"""
self._api_data = await self.get_forecasts_coord(coord)
def get_city(self) -> str | None:
"""
Get the city for which we currently have the forecast
:return: city name as str or None if unavailable
"""
return self._api_data.get('cityName', None)
def get_country(self) -> str | None:
"""
Get the two-letters country code for which we currently have the forecast
:return: country code as str or None if unavailable
"""
return self._api_data.get('country', None)
def get_current_weather(self, tz: ZoneInfo) -> CurrentWeatherData:
"""
Parse the API data we currently have to build a CurrentWeatherData.
@ -272,128 +254,38 @@ class IrmKmiApiClientHa(IrmKmiApiClient):
return current_weather
def _get_uv_index(self) -> float | None:
uv_index = None
module_data = self._api_data.get('module', None)
if not (module_data is None or not isinstance(module_data, list)):
for module in module_data:
if module.get('type', None) == 'uv':
uv_index = module.get('data', {}).get('levelValue')
return uv_index
def _get_now_hourly(self, tz: ZoneInfo) -> dict | None:
now_hourly = None
hourly_forecast_data = self._api_data.get('for', {}).get('hourly')
now = datetime.now(tz)
if not (hourly_forecast_data is None
or not isinstance(hourly_forecast_data, list)
or len(hourly_forecast_data) == 0):
for current in hourly_forecast_data[:4]:
if now.strftime('%H') == current['hour']:
now_hourly = current
break
return now_hourly
def get_daily_forecast(self, tz: ZoneInfo, lang: str) -> List[IrmKmiForecast]:
def get_radar_forecast(self) -> List[IrmKmiRadarForecast]:
"""
Parse the API data we currently have to build the daily forecast list.
Create a list of short term forecasts for rain based on the data provided by the rain radar
:param tz: time zone to use to interpret the timestamps in the forecast (generally is Europe/Brussels)
:param lang: langage to get data for (must be 'fr', 'nl', 'de' or 'en')
:return: chronologically ordered list of daily forecasts
:return: chronologically ordered list of 'few'-minutes radar forecasts
"""
data = self._api_data.get('for', {}).get('daily')
if data is None or not isinstance(data, list) or len(data) == 0:
data = self._api_data.get('animation', {})
if not isinstance(data, dict):
return []
sequence = data.get("sequence", [])
unit = data.get("unit", {}).get("en", None)
ratios = [f['value'] / f['position'] for f in sequence if f['position'] > 0]
forecasts = list()
forecast_day = datetime.now(tz)
if len(ratios) > 0:
ratio = mean(ratios)
else:
ratio = 0
for (idx, f) in enumerate(data):
precipitation = None
if f.get('precipQuantity', None) is not None:
try:
precipitation = float(f.get('precipQuantity'))
except (TypeError, ValueError):
pass
native_wind_gust_speed = None
if f.get('wind', {}).get('peakSpeed') is not None:
try:
native_wind_gust_speed = int(f.get('wind', {}).get('peakSpeed'))
except (TypeError, ValueError):
pass
wind_bearing = None
if f.get('wind', {}).get('dirText', {}).get('en') != 'VAR':
try:
wind_bearing = (float(f.get('wind', {}).get('dir')) + 180) % 360
except (TypeError, ValueError):
pass
is_daytime = f.get('dayNight', None) == 'd'
day_name = f.get('dayName', {}).get('en', None)
timestamp = f.get('timestamp', None)
if timestamp is not None:
forecast_day = datetime.fromisoformat(timestamp)
elif day_name in WEEKDAYS:
forecast_day = next_weekday(forecast_day, WEEKDAYS.index(day_name))
elif day_name in ['Today', 'Tonight']:
forecast_day = datetime.now(tz)
elif day_name == 'Tomorrow':
forecast_day = datetime.now(tz) + timedelta(days=1)
sunrise_sec = f.get('dawnRiseSeconds', None)
if sunrise_sec is None:
sunrise_sec = f.get('sunRise', None)
sunrise = None
if sunrise_sec is not None:
try:
sunrise = (forecast_day.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=tz)
+ timedelta(seconds=float(sunrise_sec)))
except (TypeError, ValueError):
pass
sunset_sec = f.get('dawnSetSeconds', None)
if sunset_sec is None:
sunset_sec = f.get('sunSet', None)
sunset = None
if sunset_sec is not None:
try:
sunset = (forecast_day.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=tz)
+ timedelta(seconds=float(sunset_sec)))
except (TypeError, ValueError):
pass
forecast = IrmKmiForecast(
datetime=(forecast_day.strftime('%Y-%m-%d')),
condition=self._cdt_map.get((f.get('ww1', None), f.get('dayNight', None)), None),
condition_2=self._cdt_map.get((f.get('ww2', None), f.get('dayNight', None)), None),
condition_evol=WWEVOL_TO_ENUM_MAP.get(f.get('wwevol'), None),
native_precipitation=precipitation,
native_temperature=f.get('tempMax', None),
native_templow=f.get('tempMin', None),
native_wind_gust_speed=native_wind_gust_speed,
native_wind_speed=f.get('wind', {}).get('speed'),
precipitation_probability=f.get('precipChance', None),
wind_bearing=wind_bearing,
is_daytime=is_daytime,
text=f.get('text', {}).get(lang, ""),
sunrise=sunrise.isoformat() if sunrise is not None else None,
sunset=sunset.isoformat() if sunset is not None else None
forecast = list()
for f in sequence:
forecast.append(
IrmKmiRadarForecast(
datetime=f.get("time"),
native_precipitation=f.get('value'),
rain_forecast_max=round(f.get('positionHigher') * ratio, 2),
rain_forecast_min=round(f.get('positionLower') * ratio, 2),
might_rain=f.get('positionHigher') > 0,
unit=unit
)
# Swap temperature and templow if needed
if (forecast['native_templow'] is not None
and forecast['native_temperature'] is not None
and forecast['native_templow'] > forecast['native_temperature']):
(forecast['native_templow'], forecast['native_temperature']) = \
(forecast['native_temperature'], forecast['native_templow'])
forecasts.append(forecast)
return forecasts
)
return forecast
def get_hourly_forecast(self, tz: ZoneInfo) -> List[Forecast]:
"""
@ -452,38 +344,105 @@ class IrmKmiApiClientHa(IrmKmiApiClient):
return forecasts
def get_radar_forecast(self) -> List[IrmKmiRadarForecast]:
def get_daily_forecast(self, tz: ZoneInfo, lang: str) -> List[IrmKmiForecast]:
"""
Create a list of short term forecasts for rain based on the data provided by the rain radar
Parse the API data we currently have to build the daily forecast list.
:return: chronologically ordered list of 'few'-minutes radar forecasts
:param tz: time zone to use to interpret the timestamps in the forecast (generally is Europe/Brussels)
:param lang: langage to get data for (must be 'fr', 'nl', 'de' or 'en')
:return: chronologically ordered list of daily forecasts
"""
data = self._api_data.get('animation', {})
if not isinstance(data, dict):
data = self._api_data.get('for', {}).get('daily')
if data is None or not isinstance(data, list) or len(data) == 0:
return []
sequence = data.get("sequence", [])
unit = data.get("unit", {}).get("en", None)
ratios = [f['value'] / f['position'] for f in sequence if f['position'] > 0]
if len(ratios) > 0:
ratio = mean(ratios)
else:
ratio = 0
forecasts = list()
forecast_day = datetime.now(tz)
forecast = list()
for f in sequence:
forecast.append(
IrmKmiRadarForecast(
datetime=f.get("time"),
native_precipitation=f.get('value'),
rain_forecast_max=round(f.get('positionHigher') * ratio, 2),
rain_forecast_min=round(f.get('positionLower') * ratio, 2),
might_rain=f.get('positionHigher') > 0,
unit=unit
for (idx, f) in enumerate(data):
precipitation = None
if f.get('precipQuantity', None) is not None:
try:
precipitation = float(f.get('precipQuantity'))
except (TypeError, ValueError):
pass
native_wind_gust_speed = None
if f.get('wind', {}).get('peakSpeed') is not None:
try:
native_wind_gust_speed = int(f.get('wind', {}).get('peakSpeed'))
except (TypeError, ValueError):
pass
wind_bearing = None
if f.get('wind', {}).get('dirText', {}).get('en') != 'VAR':
try:
wind_bearing = (float(f.get('wind', {}).get('dir')) + 180) % 360
except (TypeError, ValueError):
pass
is_daytime = f.get('dayNight', None) == 'd'
day_name = f.get('dayName', {}).get('en', None)
timestamp = f.get('timestamp', None)
if timestamp is not None:
forecast_day = datetime.fromisoformat(timestamp)
elif day_name in WEEKDAYS:
forecast_day = self._next_weekday(forecast_day, WEEKDAYS.index(day_name))
elif day_name in ['Today', 'Tonight']:
forecast_day = datetime.now(tz)
elif day_name == 'Tomorrow':
forecast_day = datetime.now(tz) + timedelta(days=1)
sunrise_sec = f.get('dawnRiseSeconds', None)
if sunrise_sec is None:
sunrise_sec = f.get('sunRise', None)
sunrise = None
if sunrise_sec is not None:
try:
sunrise = (forecast_day.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=tz)
+ timedelta(seconds=float(sunrise_sec)))
except (TypeError, ValueError):
pass
sunset_sec = f.get('dawnSetSeconds', None)
if sunset_sec is None:
sunset_sec = f.get('sunSet', None)
sunset = None
if sunset_sec is not None:
try:
sunset = (forecast_day.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=tz)
+ timedelta(seconds=float(sunset_sec)))
except (TypeError, ValueError):
pass
forecast = IrmKmiForecast(
datetime=(forecast_day.strftime('%Y-%m-%d')),
condition=self._cdt_map.get((f.get('ww1', None), f.get('dayNight', None)), None),
condition_2=self._cdt_map.get((f.get('ww2', None), f.get('dayNight', None)), None),
condition_evol=WWEVOL_TO_ENUM_MAP.get(f.get('wwevol'), None),
native_precipitation=precipitation,
native_temperature=f.get('tempMax', None),
native_templow=f.get('tempMin', None),
native_wind_gust_speed=native_wind_gust_speed,
native_wind_speed=f.get('wind', {}).get('speed'),
precipitation_probability=f.get('precipChance', None),
wind_bearing=wind_bearing,
is_daytime=is_daytime,
text=f.get('text', {}).get(lang, ""),
sunrise=sunrise.isoformat() if sunrise is not None else None,
sunset=sunset.isoformat() if sunset is not None else None
)
)
return forecast
# Swap temperature and templow if needed
if (forecast['native_templow'] is not None
and forecast['native_temperature'] is not None
and forecast['native_templow'] > forecast['native_temperature']):
(forecast['native_templow'], forecast['native_temperature']) = \
(forecast['native_temperature'], forecast['native_templow'])
forecasts.append(forecast)
return forecasts
def get_animation_data(self, tz: ZoneInfo, lang: str, style: str, dark_mode: bool) -> RadarAnimationData:
"""
@ -605,6 +564,45 @@ class IrmKmiApiClientHa(IrmKmiApiClient):
return PollenParser(pollen_svg).get_pollen_data()
def get_city(self) -> str | None:
"""
Get the city for which we currently have the forecast
:return: city name as str or None if unavailable
"""
return self._api_data.get('cityName', None)
def get_country(self) -> str | None:
"""
Get the two-letters country code for which we currently have the forecast
:return: country code as str or None if unavailable
"""
return self._api_data.get('country', None)
def _get_uv_index(self) -> float | None:
uv_index = None
module_data = self._api_data.get('module', None)
if not (module_data is None or not isinstance(module_data, list)):
for module in module_data:
if module.get('type', None) == 'uv':
uv_index = module.get('data', {}).get('levelValue')
return uv_index
def _get_now_hourly(self, tz: ZoneInfo) -> dict | None:
now_hourly = None
hourly_forecast_data = self._api_data.get('for', {}).get('hourly')
now = datetime.now(tz)
if not (hourly_forecast_data is None
or not isinstance(hourly_forecast_data, list)
or len(hourly_forecast_data) == 0):
for current in hourly_forecast_data[:4]:
if now.strftime('%H') == current['hour']:
now_hourly = current
break
return now_hourly
@staticmethod
def _merge_url_and_params(url: str, params: dict) -> str:
"""Merge query string params in the URL"""
@ -615,3 +613,9 @@ class IrmKmiApiClientHa(IrmKmiApiClient):
new_url = parsed_url._replace(query=new_query)
return str(urllib.parse.urlunparse(new_url))
@staticmethod
def _next_weekday(current, weekday):
days_ahead = weekday - current.weekday()
if days_ahead < 0:
days_ahead += 7
return current + timedelta(days_ahead)

View file

@ -6,6 +6,8 @@ POLLEN_NAMES: Final = {'Alder', 'Ash', 'Birch', 'Grasses', 'Hazel', 'Mugwort', '
POLLEN_LEVEL_TO_COLOR = {'null': 'green', 'low': 'yellow', 'moderate': 'orange', 'high': 'red', 'very high': 'purple',
'active': 'active'}
WEEKDAYS = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
# TODO move those to an Enum
OPTION_STYLE_STD: Final = 'standard_style'
OPTION_STYLE_CONTRAST: Final = 'contrast_style'
OPTION_STYLE_YELLOW_RED: Final = 'yellow_red_style'
@ -16,6 +18,7 @@ STYLE_TO_PARAM_MAP: Final = {
OPTION_STYLE_YELLOW_RED: 3,
OPTION_STYLE_SATELLITE: 4
}
MAP_WARNING_ID_TO_SLUG: Final = {
0: 'wind',
1: 'rain',

View file

@ -21,39 +21,13 @@ class PollenParser:
):
self._xml = xml_string
@staticmethod
def get_default_data() -> dict:
"""Return all the known pollen with 'none' value"""
return {k.lower(): 'none' for k in POLLEN_NAMES}
@staticmethod
def get_unavailable_data() -> dict:
"""Return all the known pollen with 'none' value"""
return {k.lower(): None for k in POLLEN_NAMES}
@staticmethod
def get_option_values() -> List[str]:
"""List all the values that the pollen can have"""
return list(POLLEN_LEVEL_TO_COLOR.values()) + ['none']
@staticmethod
def _extract_elements(root) -> List[ET.Element]:
"""Recursively collect all elements of the SVG in a list"""
elements = []
for child in root:
elements.append(child)
elements.extend(PollenParser._extract_elements(child))
return elements
@staticmethod
def _get_elem_text(e) -> str | None:
if e.text is not None:
return e.text.strip()
return None
def get_pollen_data(self) -> dict:
"""From the XML string, parse the SVG and extract the pollen data from the image.
If an error occurs, return the default value"""
"""
Parse the SVG and extract the pollen data from the image.
If an error occurs, return the default value.
:return: pollen dict
"""
pollen_data = self.get_default_data()
try:
_LOGGER.debug(f"Full SVG: {self._xml}")
@ -106,3 +80,34 @@ class PollenParser:
_LOGGER.debug(f"Pollen data: {pollen_data}")
return pollen_data
@staticmethod
def get_default_data() -> dict:
"""Return all the known pollen with 'none' value"""
return {k.lower(): 'none' for k in POLLEN_NAMES}
@staticmethod
def get_unavailable_data() -> dict:
"""Return all the known pollen with None value"""
return {k.lower(): None for k in POLLEN_NAMES}
@staticmethod
def get_option_values() -> List[str]:
"""List all the values that the pollen can have"""
return list(POLLEN_LEVEL_TO_COLOR.values()) + ['none']
@staticmethod
def _extract_elements(root) -> List[ET.Element]:
"""Recursively collect all elements of the SVG in a list"""
elements = []
for child in root:
elements.append(child)
elements.extend(PollenParser._extract_elements(child))
return elements
@staticmethod
def _get_elem_text(e) -> str | None:
if e.text is not None:
return e.text.strip()
return None

View file

@ -4,7 +4,7 @@ import base64
import copy
import datetime
import logging
from typing import Any, List, Self
from typing import List, Self
import async_timeout
from svgwrite import Drawing
@ -20,6 +20,8 @@ _LOGGER = logging.getLogger(__name__)
class RainGraph:
"""Create and get rain radar animated SVG"""
def __init__(self,
animation_data: RadarAnimationData,
country: str,
@ -77,74 +79,110 @@ class RainGraph:
async def build(self) -> Self:
"""Build the rain graph by calling all the method in the right order. Returns self when done"""
await self.draw_svg_frame()
self.draw_hour_bars()
self.draw_chances_path()
self.draw_data_line()
self.write_hint()
await self.insert_background()
await self._draw_svg_frame()
self._draw_hour_bars()
self._draw_chances_path()
self._draw_data_line()
self._write_hint()
await self._insert_background()
self._dwg_save = copy.deepcopy(self._dwg)
return self
async def get_animated(self) -> bytes:
"""Get the animated SVG. If called for the first time since refresh, downloads the images to build the file."""
"""
Get the animated SVG. If called for the first time, downloads the cloud images to build the file.
:return: utf-8 encoded animated SVG string
:raises: ValueError if build() was not called before
"""
if self._dwg_save is None:
raise ValueError("You need to call .build() before getting the SVG")
_LOGGER.info(f"Get animated with _dwg_animated {self._dwg_animated}")
if self._dwg_animated is None:
clouds = self.download_clouds()
clouds = self._download_clouds()
self._dwg = copy.deepcopy(self._dwg_save)
self.draw_current_fame_line()
self.draw_description_text()
self._draw_current_fame_line()
self._draw_description_text()
try:
await clouds
self.insert_cloud_layer()
self._insert_cloud_layer()
except IrmKmiApiError as err:
_LOGGER.warning(f"Could not download clouds from API: {err}")
await self.draw_location()
await self._draw_location()
self._dwg_animated = self._dwg
return self.get_svg_string(still_image=False)
return self._get_svg_string(still_image=False)
async def get_still(self) -> bytes:
"""Get the animated SVG. If called for the first time since refresh, downloads the images to build the file."""
"""
Get the still SVG. If called for the first time, downloads the cloud images to build the file.
:return: utf-8 encoded SVG string
:raises: ValueError if build() was not called before
"""
if self._dwg_save is None:
raise ValueError("You need to call .build() before getting the SVG")
_LOGGER.info(f"Get still with _dwg_still {self._dwg_still}")
if self._dwg_still is None:
idx = self._animation_data['most_recent_image_idx']
cloud = self.download_clouds(idx)
cloud = self._download_clouds(idx)
self._dwg = copy.deepcopy(self._dwg_save)
self.draw_current_fame_line(idx)
self.draw_description_text(idx)
self._draw_current_fame_line(idx)
self._draw_description_text(idx)
try:
await cloud
self.insert_cloud_layer(idx)
self._insert_cloud_layer(idx)
except IrmKmiApiError as err:
_LOGGER.warning(f"Could not download clouds from API: {err}")
await self.draw_location()
await self._draw_location()
self._dwg_still = self._dwg
return self.get_svg_string(still_image=True)
return self._get_svg_string(still_image=True)
async def download_clouds(self, idx = None):
def get_hint(self) -> str:
"""
Get hint to display on the rain graph
:return: hint sentence as str
"""
return self._animation_data.get('hint', '')
async def _download_clouds(self, idx=None):
"""
Download cloud images and save the result in the internal state.
:param idx: index of the image to download (if not specified, downloads all the images)
:raises: IrmKmiApiError if communication with the API fails
"""
imgs = [e['image'] for e in self._animation_data['sequence']]
if idx is not None and type(imgs[idx]) is str:
_LOGGER.info("Download single cloud image")
print("Download single cloud image")
result = await self.download_images_from_api([imgs[idx]])
result = await self._download_images_from_api([imgs[idx]])
self._animation_data['sequence'][idx]['image'] = result[0]
else:
_LOGGER.info("Download many cloud images")
result = await self.download_images_from_api([img for img in imgs if type(img) is str])
result = await self._download_images_from_api([img for img in imgs if type(img) is str])
for i in range(len(self._animation_data['sequence'])):
if type(self._animation_data['sequence'][i]['image']) is str:
self._animation_data['sequence'][i]['image'] = result[0]
result = result[1:]
async def download_images_from_api(self, urls: list[str]) -> list[Any]:
"""Download a batch of images to create the radar frames."""
async def _download_images_from_api(self, urls: list[str]) -> list[bytes]:
"""
Download a batch of images to create the radar frames.
:param urls: list of urls to download
:return: list images downloaded as bytes
:raises: IrmKmiApiError if communication with the API fails
"""
coroutines = list()
for url in urls:
@ -155,10 +193,7 @@ class RainGraph:
_LOGGER.info(f"Just downloaded {len(images_from_api)} images")
return images_from_api
def get_hint(self) -> str:
return self._animation_data.get('hint', None)
async def draw_svg_frame(self):
async def _draw_svg_frame(self):
"""Create the global area to draw the other items"""
mimetype = "application/x-font-ttf"
@ -167,9 +202,7 @@ class RainGraph:
self._dwg.embed_stylesheet(content)
self._dwg.embed_stylesheet("""
.roboto {
font-family: "Roboto Medium";
}
.roboto { font-family: "Roboto Medium"; }
""")
fill_color = '#393C40' if self._dark_mode else '#385E95'
@ -178,7 +211,7 @@ class RainGraph:
rx=None, ry=None,
fill=fill_color, stroke='none'))
def draw_description_text(self, idx: int | None = None):
def _draw_description_text(self, idx: int | None = None):
"""For every frame write the amount of precipitation and the time at the top of the graph.
If idx is set, only do it for the given idx"""
@ -192,7 +225,7 @@ class RainGraph:
paragraph = self._dwg.add(self._dwg.g(class_="roboto", ))
self.write_time_and_rain(paragraph, rain_level, time)
self._write_time_and_rain(paragraph, rain_level, time)
return
for i in range(self._frame_count):
@ -212,9 +245,9 @@ class RainGraph:
repeatCount="indefinite"
))
self.write_time_and_rain(paragraph, rain_level, time)
self._write_time_and_rain(paragraph, rain_level, time)
def write_time_and_rain(self, paragraph, rain_level, time):
def _write_time_and_rain(self, paragraph, rain_level, time):
"""Using the paragraph object, write the time and rain level data"""
paragraph.add(self._dwg.text(f"{time}", insert=(self._offset, self._top_text_y_pos),
text_anchor="start",
@ -227,11 +260,11 @@ class RainGraph:
fill="white",
stroke='none'))
def write_hint(self):
def _write_hint(self):
"""Add the hint text at the bottom of the graph"""
paragraph = self._dwg.add(self._dwg.g(class_="roboto", ))
hint = self._animation_data['hint']
hint = self.get_hint()
paragraph.add(self._dwg.text(f"{hint}", insert=(self._svg_width / 2, self._bottom_text_y_pos),
text_anchor="middle",
@ -239,7 +272,7 @@ class RainGraph:
fill="white",
stroke='none'))
def draw_chances_path(self):
def _draw_chances_path(self):
"""Draw the prevision margin area around the main forecast line"""
list_lower_points = []
list_higher_points = []
@ -264,36 +297,9 @@ class RainGraph:
graph_rect_right -= self._interval_width
if list_higher_points and list_lower_points:
self.draw_chance_precip(list_higher_points, list_lower_points)
self._draw_chance_precip(list_higher_points, list_lower_points)
def draw_chance_precip(self, list_higher_points: List, list_lower_points: List):
"""Draw the blue solid line representing the actual rain forecast"""
precip_higher_chance_path = self._dwg.path(fill='#63c8fa', stroke='none', opacity=.3)
list_higher_points[-1] = tuple(list(list_higher_points[-1]) + ['last'])
self.set_curved_path(precip_higher_chance_path, list_higher_points + list_lower_points)
self._dwg.add(precip_higher_chance_path)
@staticmethod
def set_curved_path(path, points):
"""Pushes points on the path by creating a nice curve between them"""
if len(points) < 2:
return
path.push('M', *points[0])
for i in range(1, len(points)):
x_mid = (points[i - 1][0] + points[i][0]) / 2
y_mid = (points[i - 1][1] + points[i][1]) / 2
path.push('Q', points[i - 1][0], points[i - 1][1], x_mid, y_mid)
if points[i][-1] == 'last' or points[i - 1][-1] == 'last':
path.push('Q', points[i][0], points[i][1], points[i][0], points[i][1])
path.push('Q', points[-1][0], points[-1][1], points[-1][0], points[-1][1])
def draw_data_line(self):
def _draw_data_line(self):
"""Draw the main data line for the rain forecast"""
rain_list: List[AnimationFrameData] = self._animation_data['sequence']
graph_rect_left = self._offset
@ -308,10 +314,10 @@ class RainGraph:
graph_rect_top + (1.0 - position) * self._graph_height))
graph_rect_left += self._interval_width
data_line_path = self._dwg.path(fill='none', stroke='#63c8fa', stroke_width=2)
self.set_curved_path(data_line_path, entry_list)
self._set_curved_path(data_line_path, entry_list)
self._dwg.add(data_line_path)
def draw_hour_bars(self):
def _draw_hour_bars(self):
"""Draw the small bars at the bottom to represent the time"""
hour_bar_height = 8
horizontal_inset = self._offset
@ -350,7 +356,7 @@ class RainGraph:
end=(self._graph_width + self._interval_width / 2, self._graph_bottom),
stroke='white'))
def draw_current_fame_line(self, idx: int | None = None):
def _draw_current_fame_line(self, idx: int | None = None):
"""Draw a solid white line on the timeline at the position of the given frame index"""
x_position = self._offset if idx is None else self._offset + idx * self._interval_width
now = self._dwg.add(self._dwg.line(start=(x_position, self._top_text_space),
@ -368,15 +374,21 @@ class RainGraph:
dur=f"{self._frame_count * 0.3}s",
repeatCount="indefinite"))
def get_svg_string(self, still_image: bool = False) -> bytes:
def _get_svg_string(self, still_image: bool = False) -> bytes:
"""
Get the utf-8 encoded string representing the SVG
:param still_image: if true the non-animated version is returned
:return: utf-8 encoded string
"""
return self._dwg_still.tostring().encode() if still_image else self._dwg_animated.tostring().encode()
async def insert_background(self):
png_data = self.get_background_png_b64()
async def _insert_background(self):
png_data = self._get_background_png_b64()
image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size)
self._dwg.add(image)
def insert_cloud_layer(self, idx: int | None = None):
def _insert_cloud_layer(self, idx: int | None = None):
imgs = [e['image'] for e in self._animation_data['sequence']]
if idx is not None:
@ -402,23 +414,22 @@ class RainGraph:
repeatCount="indefinite"
))
async def draw_location(self):
async def _draw_location(self):
img = self._animation_data['location']
_LOGGER.info(f"Draw location layer with img of type {type(img)}")
if type(img) is str:
result = await self.download_images_from_api([img])
result = await self._download_images_from_api([img])
img = result[0]
self._animation_data['location'] = img
png_data = base64.b64encode(img).decode('utf-8')
image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size)
self._dwg.add(image)
def get_dwg(self):
def _get_dwg(self) -> Drawing:
return copy.deepcopy(self._dwg)
def get_background_png_b64(self):
_LOGGER.debug(f"Get b64 for {self._country} {self._style} {'dark' if self._dark_mode else 'light'} mode")
def _get_background_png_b64(self) -> str:
if self._country == 'NL':
return nl.nl_b64
elif self._style == OPTION_STYLE_SATELLITE:
@ -427,3 +438,30 @@ class RainGraph:
return be_black.be_black_b64
else:
return be_white.be_white_b64
def _draw_chance_precip(self, list_higher_points: List, list_lower_points: List):
"""Draw the blue solid line representing the actual rain forecast"""
precip_higher_chance_path = self._dwg.path(fill='#63c8fa', stroke='none', opacity=.3)
list_higher_points[-1] = tuple(list(list_higher_points[-1]) + ['last'])
self._set_curved_path(precip_higher_chance_path, list_higher_points + list_lower_points)
self._dwg.add(precip_higher_chance_path)
@staticmethod
def _set_curved_path(path, points):
"""Pushes points on the path by creating a nice curve between them"""
if len(points) < 2:
return
path.push('M', *points[0])
for i in range(1, len(points)):
x_mid = (points[i - 1][0] + points[i][0]) / 2
y_mid = (points[i - 1][1] + points[i][1]) / 2
path.push('Q', points[i - 1][0], points[i - 1][1], x_mid, y_mid)
if points[i][-1] == 'last' or points[i - 1][-1] == 'last':
path.push('Q', points[i][0], points[i][1], points[i][0], points[i][1])
path.push('Q', points[-1][0], points[-1][1], points[-1][0], points[-1][1])

View file

@ -1,8 +0,0 @@
from datetime import timedelta
def next_weekday(current, weekday):
days_ahead = weekday - current.weekday()
if days_ahead < 0:
days_ahead += 7
return current + timedelta(days_ahead)

View file

@ -48,9 +48,9 @@ async def test_svg_frame_setup():
style='STD',
)
await rain_graph.draw_svg_frame()
await rain_graph._draw_svg_frame()
svg_str = rain_graph.get_dwg().tostring()
svg_str = rain_graph._get_dwg().tostring()
with open("irm_kmi_api/resources/roboto_medium.ttf", "rb") as file:
font_b64 = base64.b64encode(file.read()).decode('utf-8')
@ -68,9 +68,9 @@ def test_svg_hint():
style='STD',
)
rain_graph.write_hint()
rain_graph._write_hint()
svg_str = rain_graph.get_dwg().tostring()
svg_str = rain_graph._get_dwg().tostring()
assert "Testing SVG camera" in svg_str
@ -84,9 +84,9 @@ def test_svg_time_bars():
style='STD',
)
rain_graph.draw_hour_bars()
rain_graph._draw_hour_bars()
svg_str = rain_graph.get_dwg().tostring()
svg_str = rain_graph._get_dwg().tostring()
assert "19h" in svg_str
assert "20h" in svg_str
@ -103,9 +103,9 @@ def test_draw_chances_path():
style='STD',
)
rain_graph.draw_chances_path()
rain_graph._draw_chances_path()
svg_str = rain_graph.get_dwg().tostring()
svg_str = rain_graph._get_dwg().tostring()
assert 'fill="#63c8fa"' in svg_str
assert 'opacity="0.3"' in svg_str
@ -121,9 +121,9 @@ def test_draw_data_line():
style='STD',
)
rain_graph.draw_data_line()
rain_graph._draw_data_line()
svg_str = rain_graph.get_dwg().tostring()
svg_str = rain_graph._get_dwg().tostring()
assert 'fill="none"' in svg_str
assert 'stroke-width="2"' in svg_str
@ -139,12 +139,12 @@ async def test_insert_background():
style='STD',
)
await rain_graph.insert_background()
await rain_graph._insert_background()
with open("irm_kmi_api/resources/be_white.png", "rb") as file:
png_b64 = base64.b64encode(file.read()).decode('utf-8')
svg_str = rain_graph.get_dwg().tostring()
svg_str = rain_graph._get_dwg().tostring()
assert png_b64 in svg_str
assert "<image " in svg_str
@ -162,9 +162,9 @@ def test_draw_current_frame_line_moving():
style='STD',
)
rain_graph.draw_current_fame_line()
rain_graph._draw_current_fame_line()
str_svg = rain_graph.get_dwg().tostring()
str_svg = rain_graph._get_dwg().tostring()
assert '<line' in str_svg
assert 'id="now"' in str_svg
@ -190,9 +190,9 @@ def test_draw_current_frame_line_index():
style='STD',
)
rain_graph.draw_current_fame_line(0)
rain_graph._draw_current_fame_line(0)
str_svg = rain_graph.get_dwg().tostring()
str_svg = rain_graph._get_dwg().tostring()
assert '<line' in str_svg
assert 'id="now"' in str_svg
@ -219,9 +219,9 @@ def test_draw_description_text():
style='STD',
)
rain_graph.draw_description_text()
rain_graph._draw_description_text()
str_svg = rain_graph.get_dwg().tostring()
str_svg = rain_graph._get_dwg().tostring()
assert "18:30" in str_svg
assert "18:40" in str_svg
@ -246,9 +246,9 @@ def test_draw_cloud_layer():
style='STD',
)
rain_graph.insert_cloud_layer()
rain_graph._insert_cloud_layer()
str_svg = rain_graph.get_dwg().tostring()
str_svg = rain_graph._get_dwg().tostring()
with open("tests/fixtures/clouds_be.png", "rb") as file:
png_b64 = base64.b64encode(file.read()).decode('utf-8')
@ -266,9 +266,9 @@ async def test_draw_location_layer():
style='STD',
)
await rain_graph.draw_location()
await rain_graph._draw_location()
str_svg = rain_graph.get_dwg().tostring()
str_svg = rain_graph._get_dwg().tostring()
with open("tests/fixtures/loc_layer_be_n.png", "rb") as file:
png_b64 = base64.b64encode(file.read()).decode('utf-8')
@ -312,7 +312,7 @@ async def test_download_single_cloud():
rain_graph._api_client = MagicMock()
rain_graph._api_client.get_image = AsyncMock()
await rain_graph.download_clouds(2)
await rain_graph._download_clouds(2)
rain_graph._api_client.get_image.assert_called_once_with('image-url-2')
@ -330,7 +330,7 @@ async def test_download_many_clouds():
rain_graph._api_client = MagicMock()
rain_graph._api_client.get_image = AsyncMock()
await rain_graph.download_clouds()
await rain_graph._download_clouds()
for i in range(10):
rain_graph._api_client.get_image.assert_any_call(f'image-url-{i}')