Initial support for rain radar

This commit is contained in:
Jules 2023-12-27 23:31:48 +01:00
parent e3d464e28f
commit 1f97db64ff
Signed by: jdejaegh
GPG key ID: 99D6D184CA66933A
12 changed files with 196 additions and 19 deletions

View file

@ -9,6 +9,7 @@ from datetime import datetime
import aiohttp import aiohttp
import async_timeout import async_timeout
from aiohttp import ClientResponse
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -50,38 +51,36 @@ class IrmKmiApiClient:
coord['lat'] = round(coord['lat'], self.COORD_DECIMALS) coord['lat'] = round(coord['lat'], self.COORD_DECIMALS)
coord['long'] = round(coord['long'], self.COORD_DECIMALS) coord['long'] = round(coord['long'], self.COORD_DECIMALS)
return await self._api_wrapper( response = await self._api_wrapper(params={"s": "getForecasts", "k": _api_key("getForecasts")} | coord)
params={"s": "getForecasts"} | coord return await response.json()
)
async def get_image(self, url, params: dict | None = None) -> bytes:
# TODO support etag and head request before requesting content
r: ClientResponse = await self._api_wrapper(base_url=url, params={} if params is None else params)
return await r.read()
async def _api_wrapper( async def _api_wrapper(
self, self,
params: dict, params: dict,
base_url: str | None = None,
path: str = "", path: str = "",
method: str = "get", method: str = "get",
data: dict | None = None, data: dict | None = None,
headers: dict | None = None headers: dict | None = None,
) -> any: ) -> any:
"""Get information from the API.""" """Get information from the API."""
if 's' not in params:
raise IrmKmiApiParametersError("No query provided as 's' argument for API")
else:
params['k'] = _api_key(params['s'])
try: try:
async with async_timeout.timeout(10): async with async_timeout.timeout(10):
_LOGGER.debug(f"Calling for {params}")
response = await self._session.request( response = await self._session.request(
method=method, method=method,
url=f"{self._base_url}{path}", url=f"{self._base_url if base_url is None else base_url}{path}",
headers=headers, headers=headers,
json=data, json=data,
params=params params=params
) )
_LOGGER.debug(f"API status code {response.status}")
response.raise_for_status() response.raise_for_status()
return await response.json() return response
except asyncio.TimeoutError as exception: except asyncio.TimeoutError as exception:
raise IrmKmiApiCommunicationError( raise IrmKmiApiCommunicationError(

View file

@ -0,0 +1,104 @@
"""Create a radar view for IRM KMI weather"""
# File inspired by https://github.com/jodur/imagesdirectory-camera/blob/main/custom_components/imagedirectory/camera.py
import logging
import os
from homeassistant.components.camera import Camera, async_get_still_stream
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from . import IrmKmiCoordinator
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback):
"""Set up the camera entry."""
_LOGGER.debug(f'async_setup_entry entry is: {entry}')
coordinator = hass.data[DOMAIN][entry.entry_id]
# await coordinator.async_config_entry_first_refresh()
async_add_entities(
[IrmKmiRadar(coordinator, entry)]
)
class IrmKmiRadar(CoordinatorEntity, Camera):
"""Representation of a local file camera."""
def __init__(self,
coordinator: IrmKmiCoordinator,
entry: ConfigEntry,
) -> None:
"""Initialize Local File Camera component."""
super().__init__(coordinator)
Camera.__init__(self)
self._name = f"Radar {entry.title}"
self._attr_unique_id = entry.entry_id
self._attr_device_info = DeviceInfo(
entry_type=DeviceEntryType.SERVICE,
identifiers={(DOMAIN, entry.entry_id)},
manufacturer="IRM KMI",
name=f"Radar {entry.title}"
)
self._image_index = 0
@property # Baseclass Camera property override
def frame_interval(self) -> float:
"""Return the interval between frames of the mjpeg stream"""
return 0.3
def camera_image(self,
width: int | None = None,
height: int | None = None) -> bytes | None:
images = self.coordinator.data.get('animation', {}).get('images')
if isinstance(images, list) and len(images) > 0:
return images[0]
return None
async def async_camera_image(
self,
width: int | None = None,
height: int | None = None
) -> bytes | None:
"""Return bytes of camera image."""
return self.camera_image()
async def handle_async_still_stream(self, request, interval):
"""Generate an HTTP MJPEG stream from camera images."""
_LOGGER.info("handle_async_still_stream")
self._image_index = 0
return await async_get_still_stream(
request, self.iterate, self.content_type, interval
)
async def handle_async_mjpeg_stream(self, request):
"""Serve an HTTP MJPEG stream from the camera."""
_LOGGER.info("handle_async_mjpeg_stream")
return await self.handle_async_still_stream(request, self.frame_interval)
async def iterate(self) -> bytes | None:
images = self.coordinator.data.get('animation', {}).get('images')
if isinstance(images, list) and len(images) > 0:
r = images[self._image_index]
self._image_index = (self._image_index + 1) % len(images)
return r
return None
@property
def name(self):
"""Return the name of this camera."""
return self._name
@property
def extra_state_attributes(self):
"""Return the camera state attributes."""
attrs = {"hint": self.coordinator.data.get('animation', {}).get('hint')}
return attrs

View file

@ -14,7 +14,7 @@ from homeassistant.components.weather import (ATTR_CONDITION_CLEAR_NIGHT,
from homeassistant.const import Platform from homeassistant.const import Platform
DOMAIN = 'irm_kmi' DOMAIN = 'irm_kmi'
PLATFORMS: list[Platform] = [Platform.WEATHER] PLATFORMS: list[Platform] = [Platform.WEATHER, Platform.CAMERA]
OUT_OF_BENELUX = ["außerhalb der Benelux (Brussels)", OUT_OF_BENELUX = ["außerhalb der Benelux (Brussels)",
"Hors de Belgique (Bxl)", "Hors de Belgique (Bxl)",
"Outside the Benelux (Brussels)", "Outside the Benelux (Brussels)",

View file

@ -1,7 +1,8 @@
"""DataUpdateCoordinator for the IRM KMI integration.""" """DataUpdateCoordinator for the IRM KMI integration."""
import asyncio
import logging import logging
from datetime import datetime, timedelta from datetime import datetime, timedelta
from io import BytesIO
from typing import List from typing import List
import async_timeout import async_timeout
@ -10,6 +11,7 @@ from homeassistant.const import ATTR_LATITUDE, ATTR_LONGITUDE
from homeassistant.helpers.aiohttp_client import async_get_clientsession from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import (DataUpdateCoordinator, from homeassistant.helpers.update_coordinator import (DataUpdateCoordinator,
UpdateFailed) UpdateFailed)
from PIL import Image, ImageDraw, ImageFont
from .api import IrmKmiApiClient, IrmKmiApiError from .api import IrmKmiApiClient, IrmKmiApiError
from .const import IRM_KMI_TO_HA_CONDITION_MAP as CDT_MAP from .const import IRM_KMI_TO_HA_CONDITION_MAP as CDT_MAP
@ -60,7 +62,64 @@ class IrmKmiCoordinator(DataUpdateCoordinator):
if api_data.get('cityName', None) in OUT_OF_BENELUX: if api_data.get('cityName', None) in OUT_OF_BENELUX:
raise UpdateFailed(f"Zone '{self._zone}' is out of Benelux and forecast is only available in the Benelux") raise UpdateFailed(f"Zone '{self._zone}' is out of Benelux and forecast is only available in the Benelux")
return self.process_api_data(api_data) result = self.process_api_data(api_data)
# TODO make such that the most up to date image is specified to entity for static display
return result | await self._async_animation_data(api_data)
async def _async_animation_data(self, api_data: dict) -> dict:
default = {'animation': None}
animation_data = api_data.get('animation', {}).get('sequence')
localisation_layer = api_data.get('animation', {}).get('localisationLayer')
country = api_data.get('country', None)
if animation_data is None or localisation_layer is None or not isinstance(animation_data, list):
return default
coroutines = list()
coroutines.append(self._api_client.get_image(f"{localisation_layer}&th={'d' if country == 'NL' else 'n'}"))
for frame in animation_data:
if frame.get('uri', None) is not None:
coroutines.append(self._api_client.get_image(frame.get('uri')))
try:
async with async_timeout.timeout(20):
r = await asyncio.gather(*coroutines, return_exceptions=True)
except IrmKmiApiError:
_LOGGER.warning(f"Could not get images for weather radar")
return default
_LOGGER.debug(f"Just downloaded {len(r)} images")
if country == 'NL':
background = Image.open("custom_components/irm_kmi/resources/nl.png").convert('RGBA')
else:
background = Image.open("custom_components/irm_kmi/resources/be_bw.png").convert('RGBA')
localisation = Image.open(BytesIO(r[0])).convert('RGBA')
merged_frames = list()
for frame in r[1:]:
layer = Image.open(BytesIO(frame)).convert('RGBA')
temp = Image.alpha_composite(background, layer)
temp = Image.alpha_composite(temp, localisation)
draw = ImageDraw.Draw(temp)
font = ImageFont.truetype("custom_components/irm_kmi/resources/roboto_medium.ttf", 16)
# TODO write actual date time
if country == 'NL':
draw.text((4, 4), "Sample Text", (0, 0, 0), font=font)
else:
draw.text((4, 4), "Sample Text", (255, 255, 255), font=font)
bytes_img = BytesIO()
temp.save(bytes_img, 'png')
merged_frames.append(bytes_img.getvalue())
return {'animation': {
'images': merged_frames,
# TODO support translation for hint
'hint': api_data.get('animation', {}).get('sequenceHint', {}).get('en')
}
}
@staticmethod @staticmethod
def process_api_data(api_data): def process_api_data(api_data):
@ -82,6 +141,7 @@ class IrmKmiCoordinator(DataUpdateCoordinator):
if module.get('type', None) == 'uv': if module.get('type', None) == 'uv':
uv_index = module.get('data', {}).get('levelValue') uv_index = module.get('data', {}).get('levelValue')
# Put everything together # Put everything together
# TODO NL cities have a better 'obs' section, use that for current weather
processed_data = { processed_data = {
'current_weather': { 'current_weather': {
'condition': CDT_MAP.get( 'condition': CDT_MAP.get(

Binary file not shown.

After

Width:  |  Height:  |  Size: 90 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 667 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 74 KiB

Binary file not shown.

View file

@ -1,4 +1,4 @@
""""Support for IRM KMI weather.""" """Support for IRM KMI weather."""
import logging import logging
from typing import List from typing import List
@ -24,7 +24,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_e
_LOGGER.debug(f'async_setup_entry entry is: {entry}') _LOGGER.debug(f'async_setup_entry entry is: {entry}')
coordinator = hass.data[DOMAIN][entry.entry_id] coordinator = hass.data[DOMAIN][entry.entry_id]
await coordinator.async_config_entry_first_refresh() # await coordinator.async_config_entry_first_refresh()
async_add_entities( async_add_entities(
[IrmKmiWeather(coordinator, entry)] [IrmKmiWeather(coordinator, entry)]
) )
@ -37,6 +37,7 @@ class IrmKmiWeather(CoordinatorEntity, WeatherEntity):
entry: ConfigEntry entry: ConfigEntry
) -> None: ) -> None:
super().__init__(coordinator) super().__init__(coordinator)
WeatherEntity.__init__(self)
self._name = entry.title self._name = entry.title
self._attr_unique_id = entry.entry_id self._attr_unique_id = entry.entry_id
self._attr_device_info = DeviceInfo( self._attr_device_info = DeviceInfo(

View file

@ -2,3 +2,4 @@ aiohttp==3.9.1
async_timeout==4.0.3 async_timeout==4.0.3
homeassistant==2023.12.3 homeassistant==2023.12.3
voluptuous==0.13.1 voluptuous==0.13.1
Pillow==10.1.0

View file

@ -78,3 +78,14 @@ def mock_exception_irm_kmi_api(request: pytest.FixtureRequest) -> Generator[None
irm_kmi = irm_kmi_api_mock.return_value irm_kmi = irm_kmi_api_mock.return_value
irm_kmi.get_forecasts_coord.side_effect = IrmKmiApiParametersError irm_kmi.get_forecasts_coord.side_effect = IrmKmiApiParametersError
yield irm_kmi yield irm_kmi
@pytest.fixture()
def mock_coordinator(request: pytest.FixtureRequest) -> Generator[None, MagicMock, None]:
"""Return a mocked IrmKmi api client."""
with patch(
"custom_components.irm_kmi.IrmKmiCoordinator", autospec=True
) as coordinator_mock:
coord = coordinator_mock.return_value
coord._async_animation_data.return_value = {'animation': None}
yield coord

View file

@ -1,6 +1,6 @@
"""Tests for the IRM KMI integration.""" """Tests for the IRM KMI integration."""
from unittest.mock import AsyncMock from unittest.mock import AsyncMock, patch
import pytest import pytest
from homeassistant.config_entries import ConfigEntryState from homeassistant.config_entries import ConfigEntryState
@ -15,6 +15,7 @@ async def test_load_unload_config_entry(
hass: HomeAssistant, hass: HomeAssistant,
mock_config_entry: MockConfigEntry, mock_config_entry: MockConfigEntry,
mock_irm_kmi_api: AsyncMock, mock_irm_kmi_api: AsyncMock,
mock_coordinator: AsyncMock
) -> None: ) -> None:
"""Test the IRM KMI configuration entry loading/unloading.""" """Test the IRM KMI configuration entry loading/unloading."""
hass.states.async_set( hass.states.async_set(