mirror of
https://github.com/jdejaegh/irm-kmi-ha.git
synced 2025-06-27 11:39:26 +02:00
Compare commits
3 commits
196d4cc178
...
0776cff6d6
Author | SHA1 | Date | |
---|---|---|---|
0776cff6d6 | |||
93bda52ac8 | |||
48fca3197f |
9 changed files with 204 additions and 123 deletions
|
@ -3,13 +3,14 @@ from __future__ import annotations
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import hashlib
|
import hashlib
|
||||||
|
import json
|
||||||
import logging
|
import logging
|
||||||
import socket
|
import socket
|
||||||
|
import time
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
import async_timeout
|
import async_timeout
|
||||||
from aiohttp import ClientResponse
|
|
||||||
from .const import USER_AGENT
|
from .const import USER_AGENT
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
@ -35,6 +36,8 @@ def _api_key(method_name: str) -> str:
|
||||||
class IrmKmiApiClient:
|
class IrmKmiApiClient:
|
||||||
"""API client for IRM KMI weather data"""
|
"""API client for IRM KMI weather data"""
|
||||||
COORD_DECIMALS = 6
|
COORD_DECIMALS = 6
|
||||||
|
cache_max_age = 60 * 60 * 2 # Remove items from the cache if they have not been hit since 2 hours
|
||||||
|
cache = {}
|
||||||
|
|
||||||
def __init__(self, session: aiohttp.ClientSession) -> None:
|
def __init__(self, session: aiohttp.ClientSession) -> None:
|
||||||
self._session = session
|
self._session = session
|
||||||
|
@ -47,18 +50,18 @@ class IrmKmiApiClient:
|
||||||
coord['lat'] = round(coord['lat'], self.COORD_DECIMALS)
|
coord['lat'] = round(coord['lat'], self.COORD_DECIMALS)
|
||||||
coord['long'] = round(coord['long'], self.COORD_DECIMALS)
|
coord['long'] = round(coord['long'], self.COORD_DECIMALS)
|
||||||
|
|
||||||
response = await self._api_wrapper(params={"s": "getForecasts", "k": _api_key("getForecasts")} | coord)
|
response: bytes = await self._api_wrapper(params={"s": "getForecasts", "k": _api_key("getForecasts")} | coord)
|
||||||
return await response.json()
|
return json.loads(response)
|
||||||
|
|
||||||
async def get_image(self, url, params: dict | None = None) -> bytes:
|
async def get_image(self, url, params: dict | None = None) -> bytes:
|
||||||
"""Get the image at the specified url with the parameters"""
|
"""Get the image at the specified url with the parameters"""
|
||||||
r: ClientResponse = await self._api_wrapper(base_url=url, params={} if params is None else params)
|
r: bytes = await self._api_wrapper(base_url=url, params={} if params is None else params)
|
||||||
return await r.read()
|
return r
|
||||||
|
|
||||||
async def get_svg(self, url, params: dict | None = None) -> str:
|
async def get_svg(self, url, params: dict | None = None) -> str:
|
||||||
"""Get SVG as str at the specified url with the parameters"""
|
"""Get SVG as str at the specified url with the parameters"""
|
||||||
r: ClientResponse = await self._api_wrapper(base_url=url, params={} if params is None else params)
|
r: bytes = await self._api_wrapper(base_url=url, params={} if params is None else params)
|
||||||
return await r.text()
|
return r.decode()
|
||||||
|
|
||||||
async def _api_wrapper(
|
async def _api_wrapper(
|
||||||
self,
|
self,
|
||||||
|
@ -68,24 +71,41 @@ class IrmKmiApiClient:
|
||||||
method: str = "get",
|
method: str = "get",
|
||||||
data: dict | None = None,
|
data: dict | None = None,
|
||||||
headers: dict | None = None,
|
headers: dict | None = None,
|
||||||
) -> any:
|
) -> bytes:
|
||||||
"""Get information from the API."""
|
"""Get information from the API."""
|
||||||
|
url = f"{self._base_url if base_url is None else base_url}{path}"
|
||||||
|
|
||||||
if headers is None:
|
if headers is None:
|
||||||
headers = {'User-Agent': USER_AGENT}
|
headers = {'User-Agent': USER_AGENT}
|
||||||
else:
|
else:
|
||||||
headers['User-Agent'] = USER_AGENT
|
headers['User-Agent'] = USER_AGENT
|
||||||
|
|
||||||
|
if url in self.cache:
|
||||||
|
headers['If-None-Match'] = self.cache[url]['etag']
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with async_timeout.timeout(60):
|
async with async_timeout.timeout(60):
|
||||||
response = await self._session.request(
|
response = await self._session.request(
|
||||||
method=method,
|
method=method,
|
||||||
url=f"{self._base_url if base_url is None else base_url}{path}",
|
url=url,
|
||||||
headers=headers,
|
headers=headers,
|
||||||
json=data,
|
json=data,
|
||||||
params=params
|
params=params
|
||||||
)
|
)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
return response
|
|
||||||
|
if response.status == 304:
|
||||||
|
_LOGGER.debug(f"Cache hit for {url}")
|
||||||
|
self.cache[url]['timestamp'] = time.time()
|
||||||
|
return self.cache[url]['response']
|
||||||
|
|
||||||
|
if 'ETag' in response.headers:
|
||||||
|
_LOGGER.debug(f"Saving in cache {url}")
|
||||||
|
r = await response.read()
|
||||||
|
self.cache[url] = {'etag': response.headers['ETag'], 'response': r, 'timestamp': time.time()}
|
||||||
|
return r
|
||||||
|
|
||||||
|
return await response.read()
|
||||||
|
|
||||||
except asyncio.TimeoutError as exception:
|
except asyncio.TimeoutError as exception:
|
||||||
raise IrmKmiApiCommunicationError("Timeout error fetching information") from exception
|
raise IrmKmiApiCommunicationError("Timeout error fetching information") from exception
|
||||||
|
@ -93,3 +113,13 @@ class IrmKmiApiClient:
|
||||||
raise IrmKmiApiCommunicationError("Error fetching information") from exception
|
raise IrmKmiApiCommunicationError("Error fetching information") from exception
|
||||||
except Exception as exception: # pylint: disable=broad-except
|
except Exception as exception: # pylint: disable=broad-except
|
||||||
raise IrmKmiApiError(f"Something really wrong happened! {exception}") from exception
|
raise IrmKmiApiError(f"Something really wrong happened! {exception}") from exception
|
||||||
|
|
||||||
|
def expire_cache(self):
|
||||||
|
now = time.time()
|
||||||
|
keys_to_delete = set()
|
||||||
|
for key, value in self.cache.items():
|
||||||
|
if now - value['timestamp'] > self.cache_max_age:
|
||||||
|
keys_to_delete.add(key)
|
||||||
|
for key in keys_to_delete:
|
||||||
|
del self.cache[key]
|
||||||
|
_LOGGER.info(f"Expired {len(keys_to_delete)} elements from API cache")
|
||||||
|
|
|
@ -46,19 +46,14 @@ class IrmKmiRadar(CoordinatorEntity, Camera):
|
||||||
"""Return the interval between frames of the mjpeg stream."""
|
"""Return the interval between frames of the mjpeg stream."""
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
def camera_image(self,
|
|
||||||
width: int | None = None,
|
|
||||||
height: int | None = None) -> bytes | None:
|
|
||||||
"""Return still image to be used as thumbnail."""
|
|
||||||
return self.coordinator.data.get('animation', {}).get('svg_still')
|
|
||||||
|
|
||||||
async def async_camera_image(
|
async def async_camera_image(
|
||||||
self,
|
self,
|
||||||
width: int | None = None,
|
width: int | None = None,
|
||||||
height: int | None = None
|
height: int | None = None
|
||||||
) -> bytes | None:
|
) -> bytes | None:
|
||||||
"""Return still image to be used as thumbnail."""
|
"""Return still image to be used as thumbnail."""
|
||||||
return self.camera_image()
|
if self.coordinator.data.get('animation', None) is not None:
|
||||||
|
return await self.coordinator.data.get('animation').get_still()
|
||||||
|
|
||||||
async def handle_async_still_stream(self, request: web.Request, interval: float) -> web.StreamResponse:
|
async def handle_async_still_stream(self, request: web.Request, interval: float) -> web.StreamResponse:
|
||||||
"""Generate an HTTP MJPEG stream from camera images."""
|
"""Generate an HTTP MJPEG stream from camera images."""
|
||||||
|
@ -73,8 +68,8 @@ class IrmKmiRadar(CoordinatorEntity, Camera):
|
||||||
"""Returns the animated svg for camera display"""
|
"""Returns the animated svg for camera display"""
|
||||||
# If this is not done this way, the live view can only be opened once
|
# If this is not done this way, the live view can only be opened once
|
||||||
self._image_index = not self._image_index
|
self._image_index = not self._image_index
|
||||||
if self._image_index:
|
if self._image_index and self.coordinator.data.get('animation', None) is not None:
|
||||||
return self.coordinator.data.get('animation', {}).get('svg_animated')
|
return await self.coordinator.data.get('animation').get_animated()
|
||||||
else:
|
else:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@ -86,5 +81,7 @@ class IrmKmiRadar(CoordinatorEntity, Camera):
|
||||||
@property
|
@property
|
||||||
def extra_state_attributes(self) -> dict:
|
def extra_state_attributes(self) -> dict:
|
||||||
"""Return the camera state attributes."""
|
"""Return the camera state attributes."""
|
||||||
attrs = {"hint": self.coordinator.data.get('animation', {}).get('hint')}
|
rain_graph = self.coordinator.data.get('animation', None)
|
||||||
|
hint = rain_graph.get_hint() if rain_graph is not None else None
|
||||||
|
attrs = {"hint": hint}
|
||||||
return attrs
|
return attrs
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
"""DataUpdateCoordinator for the IRM KMI integration."""
|
"""DataUpdateCoordinator for the IRM KMI integration."""
|
||||||
import asyncio
|
|
||||||
import logging
|
import logging
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from statistics import mean
|
from statistics import mean
|
||||||
from typing import Any, List, Tuple
|
from typing import List
|
||||||
|
import urllib.parse
|
||||||
|
|
||||||
import async_timeout
|
import async_timeout
|
||||||
from homeassistant.components.weather import Forecast
|
from homeassistant.components.weather import Forecast
|
||||||
|
@ -24,9 +24,10 @@ from .const import IRM_KMI_TO_HA_CONDITION_MAP as CDT_MAP
|
||||||
from .const import MAP_WARNING_ID_TO_SLUG as SLUG_MAP
|
from .const import MAP_WARNING_ID_TO_SLUG as SLUG_MAP
|
||||||
from .const import (OPTION_STYLE_SATELLITE, OUT_OF_BENELUX, STYLE_TO_PARAM_MAP,
|
from .const import (OPTION_STYLE_SATELLITE, OUT_OF_BENELUX, STYLE_TO_PARAM_MAP,
|
||||||
WEEKDAYS)
|
WEEKDAYS)
|
||||||
from .data import (AnimationFrameData, CurrentWeatherData, IrmKmiForecast,
|
from .data import (CurrentWeatherData, IrmKmiForecast,
|
||||||
IrmKmiRadarForecast, ProcessedCoordinatorData,
|
ProcessedCoordinatorData,
|
||||||
RadarAnimationData, WarningData)
|
WarningData)
|
||||||
|
from .radar_data import IrmKmiRadarForecast, AnimationFrameData, RadarAnimationData
|
||||||
from .pollen import PollenParser
|
from .pollen import PollenParser
|
||||||
from .rain_graph import RainGraph
|
from .rain_graph import RainGraph
|
||||||
from .utils import (disable_from_config, get_config_value, next_weekday,
|
from .utils import (disable_from_config, get_config_value, next_weekday,
|
||||||
|
@ -66,6 +67,7 @@ class IrmKmiCoordinator(TimestampDataUpdateCoordinator):
|
||||||
This is the place to pre-process the data to lookup tables
|
This is the place to pre-process the data to lookup tables
|
||||||
so entities can quickly look up their data.
|
so entities can quickly look up their data.
|
||||||
"""
|
"""
|
||||||
|
self._api_client.expire_cache()
|
||||||
if (zone := self.hass.states.get(self._zone)) is None:
|
if (zone := self.hass.states.get(self._zone)) is None:
|
||||||
raise UpdateFailed(f"Zone '{self._zone}' not found")
|
raise UpdateFailed(f"Zone '{self._zone}' not found")
|
||||||
try:
|
try:
|
||||||
|
@ -112,7 +114,7 @@ class IrmKmiCoordinator(TimestampDataUpdateCoordinator):
|
||||||
"""Refresh data and log errors."""
|
"""Refresh data and log errors."""
|
||||||
await self._async_refresh(log_failures=True, raise_on_entry_error=True)
|
await self._async_refresh(log_failures=True, raise_on_entry_error=True)
|
||||||
|
|
||||||
async def _async_animation_data(self, api_data: dict) -> RadarAnimationData:
|
async def _async_animation_data(self, api_data: dict) -> RainGraph | None:
|
||||||
"""From the API data passed in, call the API to get all the images and create the radar animation data object.
|
"""From the API data passed in, call the API to get all the images and create the radar animation data object.
|
||||||
Frames from the API are merged with the background map and the location marker to create each frame."""
|
Frames from the API are merged with the background map and the location marker to create each frame."""
|
||||||
animation_data = api_data.get('animation', {}).get('sequence')
|
animation_data = api_data.get('animation', {}).get('sequence')
|
||||||
|
@ -120,16 +122,13 @@ class IrmKmiCoordinator(TimestampDataUpdateCoordinator):
|
||||||
country = api_data.get('country', '')
|
country = api_data.get('country', '')
|
||||||
|
|
||||||
if animation_data is None or localisation_layer_url is None or not isinstance(animation_data, list):
|
if animation_data is None or localisation_layer_url is None or not isinstance(animation_data, list):
|
||||||
return RadarAnimationData()
|
return None
|
||||||
|
|
||||||
try:
|
localisation = self.merge_url_and_params(localisation_layer_url,
|
||||||
images_from_api = await self.download_images_from_api(animation_data, country, localisation_layer_url)
|
{'th': 'd' if country == 'NL' or not self._dark_mode else 'n'})
|
||||||
except IrmKmiApiError as err:
|
images_from_api = [self.merge_url_and_params(frame.get('uri'), {'rs': STYLE_TO_PARAM_MAP[self._style]})
|
||||||
_LOGGER.warning(f"Could not get images for weather radar: {err}. Keep the existing radar data.")
|
for frame in animation_data if frame is not None and frame.get('uri') is not None
|
||||||
return self.data.get('animation', RadarAnimationData()) if self.data is not None else RadarAnimationData()
|
]
|
||||||
|
|
||||||
localisation = images_from_api[0]
|
|
||||||
images_from_api = images_from_api[1:]
|
|
||||||
|
|
||||||
lang = preferred_language(self.hass, self.config_entry)
|
lang = preferred_language(self.hass, self.config_entry)
|
||||||
radar_animation = RadarAnimationData(
|
radar_animation = RadarAnimationData(
|
||||||
|
@ -137,10 +136,17 @@ class IrmKmiCoordinator(TimestampDataUpdateCoordinator):
|
||||||
unit=api_data.get('animation', {}).get('unit', {}).get(lang),
|
unit=api_data.get('animation', {}).get('unit', {}).get(lang),
|
||||||
location=localisation
|
location=localisation
|
||||||
)
|
)
|
||||||
rain_graph = await self.create_rain_graph(radar_animation, animation_data, country, images_from_api)
|
rain_graph: RainGraph = await self.create_rain_graph(radar_animation, animation_data, country, images_from_api)
|
||||||
radar_animation['svg_animated'] = rain_graph.get_svg_string()
|
return rain_graph
|
||||||
radar_animation['svg_still'] = rain_graph.get_svg_string(still_image=True)
|
|
||||||
return radar_animation
|
@staticmethod
|
||||||
|
def merge_url_and_params(url, params):
|
||||||
|
parsed_url = urllib.parse.urlparse(url)
|
||||||
|
query_params = urllib.parse.parse_qs(parsed_url.query)
|
||||||
|
query_params.update(params)
|
||||||
|
new_query = urllib.parse.urlencode(query_params, doseq=True)
|
||||||
|
new_url = parsed_url._replace(query=new_query)
|
||||||
|
return str(urllib.parse.urlunparse(new_url))
|
||||||
|
|
||||||
async def _async_pollen_data(self, api_data: dict) -> dict:
|
async def _async_pollen_data(self, api_data: dict) -> dict:
|
||||||
"""Get SVG pollen info from the API, return the pollen data dict"""
|
"""Get SVG pollen info from the API, return the pollen data dict"""
|
||||||
|
@ -179,25 +185,6 @@ class IrmKmiCoordinator(TimestampDataUpdateCoordinator):
|
||||||
country=api_data.get('country')
|
country=api_data.get('country')
|
||||||
)
|
)
|
||||||
|
|
||||||
async def download_images_from_api(self,
|
|
||||||
animation_data: list,
|
|
||||||
country: str,
|
|
||||||
localisation_layer_url: str) -> tuple[Any]:
|
|
||||||
"""Download a batch of images to create the radar frames."""
|
|
||||||
coroutines = list()
|
|
||||||
coroutines.append(
|
|
||||||
self._api_client.get_image(localisation_layer_url,
|
|
||||||
params={'th': 'd' if country == 'NL' or not self._dark_mode else 'n'}))
|
|
||||||
|
|
||||||
for frame in animation_data:
|
|
||||||
if frame.get('uri', None) is not None:
|
|
||||||
coroutines.append(
|
|
||||||
self._api_client.get_image(frame.get('uri'), params={'rs': STYLE_TO_PARAM_MAP[self._style]}))
|
|
||||||
async with async_timeout.timeout(60):
|
|
||||||
images_from_api = await asyncio.gather(*coroutines)
|
|
||||||
|
|
||||||
_LOGGER.debug(f"Just downloaded {len(images_from_api)} images")
|
|
||||||
return images_from_api
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def current_weather_from_data(api_data: dict) -> CurrentWeatherData:
|
async def current_weather_from_data(api_data: dict) -> CurrentWeatherData:
|
||||||
|
@ -457,7 +444,7 @@ class IrmKmiCoordinator(TimestampDataUpdateCoordinator):
|
||||||
radar_animation: RadarAnimationData,
|
radar_animation: RadarAnimationData,
|
||||||
api_animation_data: List[dict],
|
api_animation_data: List[dict],
|
||||||
country: str,
|
country: str,
|
||||||
images_from_api: Tuple[bytes],
|
images_from_api: list[str],
|
||||||
) -> RainGraph:
|
) -> RainGraph:
|
||||||
"""Create a RainGraph object that is ready to output animated and still SVG images"""
|
"""Create a RainGraph object that is ready to output animated and still SVG images"""
|
||||||
sequence: List[AnimationFrameData] = list()
|
sequence: List[AnimationFrameData] = list()
|
||||||
|
@ -494,7 +481,7 @@ class IrmKmiCoordinator(TimestampDataUpdateCoordinator):
|
||||||
bg_size = (640, 490)
|
bg_size = (640, 490)
|
||||||
|
|
||||||
return await RainGraph(radar_animation, image_path, bg_size, tz=tz, config_dir=self.hass.config.config_dir,
|
return await RainGraph(radar_animation, image_path, bg_size, tz=tz, config_dir=self.hass.config.config_dir,
|
||||||
dark_mode=self._dark_mode).build()
|
dark_mode=self._dark_mode, api_client=self._api_client).build()
|
||||||
|
|
||||||
def warnings_from_data(self, warning_data: list | None) -> List[WarningData]:
|
def warnings_from_data(self, warning_data: list | None) -> List[WarningData]:
|
||||||
"""Create a list of warning data instances based on the api data"""
|
"""Create a list of warning data instances based on the api data"""
|
||||||
|
|
|
@ -4,6 +4,8 @@ from typing import List, TypedDict
|
||||||
|
|
||||||
from homeassistant.components.weather import Forecast
|
from homeassistant.components.weather import Forecast
|
||||||
|
|
||||||
|
from .rain_graph import RainGraph
|
||||||
|
|
||||||
|
|
||||||
class IrmKmiForecast(Forecast):
|
class IrmKmiForecast(Forecast):
|
||||||
"""Forecast class with additional attributes for IRM KMI"""
|
"""Forecast class with additional attributes for IRM KMI"""
|
||||||
|
@ -14,13 +16,6 @@ class IrmKmiForecast(Forecast):
|
||||||
sunset: str | None
|
sunset: str | None
|
||||||
|
|
||||||
|
|
||||||
class IrmKmiRadarForecast(Forecast):
|
|
||||||
"""Forecast class to handle rain forecast from the IRM KMI rain radar"""
|
|
||||||
rain_forecast_max: float
|
|
||||||
rain_forecast_min: float
|
|
||||||
might_rain: bool
|
|
||||||
|
|
||||||
|
|
||||||
class CurrentWeatherData(TypedDict, total=False):
|
class CurrentWeatherData(TypedDict, total=False):
|
||||||
"""Class to hold the currently observable weather at a given location"""
|
"""Class to hold the currently observable weather at a given location"""
|
||||||
condition: str | None
|
condition: str | None
|
||||||
|
@ -32,27 +27,6 @@ class CurrentWeatherData(TypedDict, total=False):
|
||||||
pressure: float | None
|
pressure: float | None
|
||||||
|
|
||||||
|
|
||||||
class AnimationFrameData(TypedDict, total=False):
|
|
||||||
"""Holds one single frame of the radar camera, along with the timestamp of the frame"""
|
|
||||||
time: datetime | None
|
|
||||||
image: bytes | None
|
|
||||||
value: float | None
|
|
||||||
position: float | None
|
|
||||||
position_higher: float | None
|
|
||||||
position_lower: float | None
|
|
||||||
|
|
||||||
|
|
||||||
class RadarAnimationData(TypedDict, total=False):
|
|
||||||
"""Holds frames and additional data for the animation to be rendered"""
|
|
||||||
sequence: List[AnimationFrameData] | None
|
|
||||||
most_recent_image_idx: int | None
|
|
||||||
hint: str | None
|
|
||||||
unit: str | None
|
|
||||||
location: bytes | None
|
|
||||||
svg_still: bytes | None
|
|
||||||
svg_animated: bytes | None
|
|
||||||
|
|
||||||
|
|
||||||
class WarningData(TypedDict, total=False):
|
class WarningData(TypedDict, total=False):
|
||||||
"""Holds data about a specific warning"""
|
"""Holds data about a specific warning"""
|
||||||
slug: str
|
slug: str
|
||||||
|
@ -70,7 +44,7 @@ class ProcessedCoordinatorData(TypedDict, total=False):
|
||||||
hourly_forecast: List[Forecast] | None
|
hourly_forecast: List[Forecast] | None
|
||||||
daily_forecast: List[IrmKmiForecast] | None
|
daily_forecast: List[IrmKmiForecast] | None
|
||||||
radar_forecast: List[Forecast] | None
|
radar_forecast: List[Forecast] | None
|
||||||
animation: RadarAnimationData
|
animation: RainGraph | None
|
||||||
warnings: List[WarningData]
|
warnings: List[WarningData]
|
||||||
pollen: dict
|
pollen: dict
|
||||||
country: str
|
country: str
|
||||||
|
|
34
custom_components/irm_kmi/radar_data.py
Normal file
34
custom_components/irm_kmi/radar_data.py
Normal file
|
@ -0,0 +1,34 @@
|
||||||
|
"""Data classes related to radar forecast for IRM KMI integration"""
|
||||||
|
# This file was needed to avoid circular import with rain_graph.py and data.py
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import TypedDict, List
|
||||||
|
|
||||||
|
from homeassistant.components.weather import Forecast
|
||||||
|
|
||||||
|
|
||||||
|
class IrmKmiRadarForecast(Forecast):
|
||||||
|
"""Forecast class to handle rain forecast from the IRM KMI rain radar"""
|
||||||
|
rain_forecast_max: float
|
||||||
|
rain_forecast_min: float
|
||||||
|
might_rain: bool
|
||||||
|
|
||||||
|
|
||||||
|
class AnimationFrameData(TypedDict, total=False):
|
||||||
|
"""Holds one single frame of the radar camera, along with the timestamp of the frame"""
|
||||||
|
time: datetime | None
|
||||||
|
image: bytes | str | None
|
||||||
|
value: float | None
|
||||||
|
position: float | None
|
||||||
|
position_higher: float | None
|
||||||
|
position_lower: float | None
|
||||||
|
|
||||||
|
|
||||||
|
class RadarAnimationData(TypedDict, total=False):
|
||||||
|
"""Holds frames and additional data for the animation to be rendered"""
|
||||||
|
sequence: List[AnimationFrameData] | None
|
||||||
|
most_recent_image_idx: int | None
|
||||||
|
hint: str | None
|
||||||
|
unit: str | None
|
||||||
|
location: bytes | str | None
|
||||||
|
svg_still: bytes | None
|
||||||
|
svg_animated: bytes | None
|
|
@ -1,20 +1,21 @@
|
||||||
"""Create graphs for rain short term forecast."""
|
"""Create graphs for rain short term forecast."""
|
||||||
|
import asyncio
|
||||||
import base64
|
import base64
|
||||||
import copy
|
import copy
|
||||||
import datetime
|
import datetime
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
from typing import List, Self
|
from typing import List, Self, Any, Coroutine
|
||||||
|
|
||||||
|
import async_timeout
|
||||||
from aiofile import async_open
|
from aiofile import async_open
|
||||||
from homeassistant.util import dt
|
from homeassistant.util import dt
|
||||||
from svgwrite import Drawing
|
from svgwrite import Drawing
|
||||||
from svgwrite.animate import Animate
|
from svgwrite.animate import Animate
|
||||||
from svgwrite.utils import font_mimetype
|
from svgwrite.utils import font_mimetype
|
||||||
|
|
||||||
from custom_components.irm_kmi.data import (AnimationFrameData,
|
from .api import IrmKmiApiClient
|
||||||
RadarAnimationData)
|
from .radar_data import AnimationFrameData, RadarAnimationData
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -34,6 +35,7 @@ class RainGraph:
|
||||||
top_text_y_pos: float = 20,
|
top_text_y_pos: float = 20,
|
||||||
bottom_text_space: float = 50,
|
bottom_text_space: float = 50,
|
||||||
bottom_text_y_pos: float = 218,
|
bottom_text_y_pos: float = 218,
|
||||||
|
api_client: IrmKmiApiClient | None = None
|
||||||
):
|
):
|
||||||
|
|
||||||
self._animation_data: RadarAnimationData = animation_data
|
self._animation_data: RadarAnimationData = animation_data
|
||||||
|
@ -49,6 +51,7 @@ class RainGraph:
|
||||||
self._top_text_y_pos: float = top_text_y_pos + background_size[1]
|
self._top_text_y_pos: float = top_text_y_pos + background_size[1]
|
||||||
self._bottom_text_space: float = bottom_text_space
|
self._bottom_text_space: float = bottom_text_space
|
||||||
self._bottom_text_y_pos: float = bottom_text_y_pos + background_size[1]
|
self._bottom_text_y_pos: float = bottom_text_y_pos + background_size[1]
|
||||||
|
self._api_client = api_client
|
||||||
|
|
||||||
self._frame_count: int = len(self._animation_data['sequence'])
|
self._frame_count: int = len(self._animation_data['sequence'])
|
||||||
self._graph_width: float = self._svg_width - 2 * self._inset
|
self._graph_width: float = self._svg_width - 2 * self._inset
|
||||||
|
@ -64,9 +67,9 @@ class RainGraph:
|
||||||
raise ValueError("bottom_text_y_pos must be below the graph")
|
raise ValueError("bottom_text_y_pos must be below the graph")
|
||||||
|
|
||||||
self._dwg: Drawing = Drawing(size=(self._svg_width, self._svg_height), profile='full')
|
self._dwg: Drawing = Drawing(size=(self._svg_width, self._svg_height), profile='full')
|
||||||
self._dwg_save: Drawing = Drawing()
|
self._dwg_save: Drawing | None = None
|
||||||
self._dwg_animated: Drawing = Drawing()
|
self._dwg_animated: Drawing | None = None
|
||||||
self._dwg_still: Drawing = Drawing()
|
self._dwg_still: Drawing | None = None
|
||||||
|
|
||||||
async def build(self) -> Self:
|
async def build(self) -> Self:
|
||||||
"""Build the rain graph by calling all the method in the right order. Returns self when done"""
|
"""Build the rain graph by calling all the method in the right order. Returns self when done"""
|
||||||
|
@ -78,20 +81,71 @@ class RainGraph:
|
||||||
await self.insert_background()
|
await self.insert_background()
|
||||||
self._dwg_save = copy.deepcopy(self._dwg)
|
self._dwg_save = copy.deepcopy(self._dwg)
|
||||||
|
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def get_animated(self) -> bytes:
|
||||||
|
"""Get the animated SVG. If called for the first time since refresh, downloads the images to build the file."""
|
||||||
|
|
||||||
|
_LOGGER.info(f"Get animated with _dwg_animated {self._dwg_animated}")
|
||||||
|
if self._dwg_animated is None:
|
||||||
|
clouds = self.download_clouds()
|
||||||
|
self._dwg = copy.deepcopy(self._dwg_save)
|
||||||
self.draw_current_fame_line()
|
self.draw_current_fame_line()
|
||||||
self.draw_description_text()
|
self.draw_description_text()
|
||||||
|
await clouds
|
||||||
self.insert_cloud_layer()
|
self.insert_cloud_layer()
|
||||||
self.draw_location()
|
await self.draw_location()
|
||||||
self._dwg_animated = self._dwg
|
self._dwg_animated = self._dwg
|
||||||
|
return self.get_svg_string(still_image=False)
|
||||||
|
|
||||||
self._dwg = self._dwg_save
|
async def get_still(self) -> bytes:
|
||||||
|
"""Get the animated SVG. If called for the first time since refresh, downloads the images to build the file."""
|
||||||
|
_LOGGER.info(f"Get still with _dwg_still {self._dwg_still}")
|
||||||
|
|
||||||
|
if self._dwg_still is None:
|
||||||
idx = self._animation_data['most_recent_image_idx']
|
idx = self._animation_data['most_recent_image_idx']
|
||||||
|
cloud = self.download_clouds(idx)
|
||||||
|
self._dwg = copy.deepcopy(self._dwg_save)
|
||||||
self.draw_current_fame_line(idx)
|
self.draw_current_fame_line(idx)
|
||||||
self.draw_description_text(idx)
|
self.draw_description_text(idx)
|
||||||
|
await cloud
|
||||||
self.insert_cloud_layer(idx)
|
self.insert_cloud_layer(idx)
|
||||||
self.draw_location()
|
await self.draw_location()
|
||||||
self._dwg_still = self._dwg
|
self._dwg_still = self._dwg
|
||||||
return self
|
return self.get_svg_string(still_image=True)
|
||||||
|
|
||||||
|
async def download_clouds(self, idx = None):
|
||||||
|
imgs = [e['image'] for e in self._animation_data['sequence']]
|
||||||
|
|
||||||
|
if idx is not None and type(imgs[idx]) is str:
|
||||||
|
_LOGGER.info("Download single cloud image")
|
||||||
|
result = await self.download_images_from_api([imgs[idx]])
|
||||||
|
self._animation_data['sequence'][idx]['image'] = result[0]
|
||||||
|
|
||||||
|
else:
|
||||||
|
_LOGGER.info("Download many cloud images")
|
||||||
|
|
||||||
|
result = await self.download_images_from_api([img for img in imgs if type(img) is str])
|
||||||
|
|
||||||
|
for i in range(len(self._animation_data['sequence'])):
|
||||||
|
if type(self._animation_data['sequence'][i]['image']) is str:
|
||||||
|
self._animation_data['sequence'][i]['image'] = result[0]
|
||||||
|
result = result[1:]
|
||||||
|
|
||||||
|
async def download_images_from_api(self, urls: list[str]) -> list[Any]:
|
||||||
|
"""Download a batch of images to create the radar frames."""
|
||||||
|
coroutines = list()
|
||||||
|
|
||||||
|
for url in urls:
|
||||||
|
coroutines.append(self._api_client.get_image(url))
|
||||||
|
async with async_timeout.timeout(60):
|
||||||
|
images_from_api = await asyncio.gather(*coroutines)
|
||||||
|
|
||||||
|
_LOGGER.info(f"Just downloaded {len(images_from_api)} images")
|
||||||
|
return images_from_api
|
||||||
|
|
||||||
|
def get_hint(self) -> str:
|
||||||
|
return self._animation_data.get('hint', None)
|
||||||
|
|
||||||
async def draw_svg_frame(self):
|
async def draw_svg_frame(self):
|
||||||
"""Create the global area to draw the other items"""
|
"""Create the global area to draw the other items"""
|
||||||
|
@ -342,8 +396,14 @@ class RainGraph:
|
||||||
repeatCount="indefinite"
|
repeatCount="indefinite"
|
||||||
))
|
))
|
||||||
|
|
||||||
def draw_location(self):
|
async def draw_location(self):
|
||||||
img = self._animation_data['location']
|
img = self._animation_data['location']
|
||||||
|
|
||||||
|
_LOGGER.info(f"Draw location layer with img of type {type(img)}")
|
||||||
|
if type(img) is str:
|
||||||
|
result = await self.download_images_from_api([img])
|
||||||
|
img = result[0]
|
||||||
|
self._animation_data['location'] = img
|
||||||
png_data = base64.b64encode(img).decode('utf-8')
|
png_data = base64.b64encode(img).decode('utf-8')
|
||||||
image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size)
|
image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size)
|
||||||
self._dwg.add(image)
|
self._dwg.add(image)
|
||||||
|
|
|
@ -11,10 +11,10 @@ from pytest_homeassistant_custom_component.common import MockConfigEntry
|
||||||
from custom_components.irm_kmi.const import CONF_LANGUAGE_OVERRIDE
|
from custom_components.irm_kmi.const import CONF_LANGUAGE_OVERRIDE
|
||||||
from custom_components.irm_kmi.coordinator import IrmKmiCoordinator
|
from custom_components.irm_kmi.coordinator import IrmKmiCoordinator
|
||||||
from custom_components.irm_kmi.data import (CurrentWeatherData, IrmKmiForecast,
|
from custom_components.irm_kmi.data import (CurrentWeatherData, IrmKmiForecast,
|
||||||
IrmKmiRadarForecast,
|
ProcessedCoordinatorData)
|
||||||
ProcessedCoordinatorData,
|
from custom_components.irm_kmi.radar_data import IrmKmiRadarForecast, RadarAnimationData
|
||||||
RadarAnimationData)
|
|
||||||
from custom_components.irm_kmi.pollen import PollenParser
|
from custom_components.irm_kmi.pollen import PollenParser
|
||||||
|
from custom_components.irm_kmi.rain_graph import RainGraph
|
||||||
from tests.conftest import get_api_data
|
from tests.conftest import get_api_data
|
||||||
|
|
||||||
|
|
||||||
|
@ -230,7 +230,7 @@ async def test_refresh_succeed_even_when_pollen_and_radar_fail(
|
||||||
0,
|
0,
|
||||||
{"latitude": 50.738681639, "longitude": 4.054077148},
|
{"latitude": 50.738681639, "longitude": 4.054077148},
|
||||||
)
|
)
|
||||||
|
hass.config.config_dir = "."
|
||||||
mock_config_entry.add_to_hass(hass)
|
mock_config_entry.add_to_hass(hass)
|
||||||
|
|
||||||
coordinator = IrmKmiCoordinator(hass, mock_config_entry)
|
coordinator = IrmKmiCoordinator(hass, mock_config_entry)
|
||||||
|
@ -239,7 +239,7 @@ async def test_refresh_succeed_even_when_pollen_and_radar_fail(
|
||||||
|
|
||||||
assert result.get('current_weather').get('condition') == ATTR_CONDITION_CLOUDY
|
assert result.get('current_weather').get('condition') == ATTR_CONDITION_CLOUDY
|
||||||
|
|
||||||
assert result.get('animation') == dict()
|
assert result.get('animation').get_hint() == "No rain forecasted shortly"
|
||||||
|
|
||||||
assert result.get('pollen') == PollenParser.get_unavailable_data()
|
assert result.get('pollen') == PollenParser.get_unavailable_data()
|
||||||
|
|
||||||
|
@ -247,7 +247,7 @@ async def test_refresh_succeed_even_when_pollen_and_radar_fail(
|
||||||
current_weather=CurrentWeatherData(),
|
current_weather=CurrentWeatherData(),
|
||||||
daily_forecast=[],
|
daily_forecast=[],
|
||||||
hourly_forecast=[],
|
hourly_forecast=[],
|
||||||
animation=RadarAnimationData(hint="This will remain unchanged"),
|
animation=None,
|
||||||
warnings=[],
|
warnings=[],
|
||||||
pollen={'foo': 'bar'}
|
pollen={'foo': 'bar'}
|
||||||
)
|
)
|
||||||
|
@ -256,7 +256,7 @@ async def test_refresh_succeed_even_when_pollen_and_radar_fail(
|
||||||
|
|
||||||
assert result.get('current_weather').get('condition') == ATTR_CONDITION_CLOUDY
|
assert result.get('current_weather').get('condition') == ATTR_CONDITION_CLOUDY
|
||||||
|
|
||||||
assert result.get('animation').get('hint') == "This will remain unchanged"
|
assert result.get('animation').get_hint() == "No rain forecasted shortly"
|
||||||
|
|
||||||
assert result.get('pollen') == {'foo': 'bar'}
|
assert result.get('pollen') == {'foo': 'bar'}
|
||||||
|
|
||||||
|
|
|
@ -1,8 +1,7 @@
|
||||||
import base64
|
import base64
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
from custom_components.irm_kmi.data import (AnimationFrameData,
|
from custom_components.irm_kmi.radar_data import AnimationFrameData, RadarAnimationData
|
||||||
RadarAnimationData)
|
|
||||||
from custom_components.irm_kmi.rain_graph import RainGraph
|
from custom_components.irm_kmi.rain_graph import RainGraph
|
||||||
|
|
||||||
|
|
||||||
|
@ -249,7 +248,7 @@ def test_draw_cloud_layer():
|
||||||
assert str_svg.count('width="640"') == 11 # Is also the width of the SVG itself
|
assert str_svg.count('width="640"') == 11 # Is also the width of the SVG itself
|
||||||
|
|
||||||
|
|
||||||
def test_draw_location_layer():
|
async def test_draw_location_layer():
|
||||||
data = get_radar_animation_data()
|
data = get_radar_animation_data()
|
||||||
rain_graph = RainGraph(
|
rain_graph = RainGraph(
|
||||||
animation_data=data,
|
animation_data=data,
|
||||||
|
@ -257,7 +256,7 @@ def test_draw_location_layer():
|
||||||
background_size=(640, 490),
|
background_size=(640, 490),
|
||||||
)
|
)
|
||||||
|
|
||||||
rain_graph.draw_location()
|
await rain_graph.draw_location()
|
||||||
|
|
||||||
str_svg = rain_graph.get_dwg().tostring()
|
str_svg = rain_graph.get_dwg().tostring()
|
||||||
|
|
||||||
|
|
|
@ -10,8 +10,8 @@ from homeassistant.core import HomeAssistant
|
||||||
from pytest_homeassistant_custom_component.common import MockConfigEntry
|
from pytest_homeassistant_custom_component.common import MockConfigEntry
|
||||||
|
|
||||||
from custom_components.irm_kmi import IrmKmiCoordinator, IrmKmiWeather
|
from custom_components.irm_kmi import IrmKmiCoordinator, IrmKmiWeather
|
||||||
from custom_components.irm_kmi.data import (IrmKmiRadarForecast,
|
from custom_components.irm_kmi.data import (ProcessedCoordinatorData)
|
||||||
ProcessedCoordinatorData)
|
from custom_components.irm_kmi.radar_data import IrmKmiRadarForecast
|
||||||
from tests.conftest import get_api_data
|
from tests.conftest import get_api_data
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue