First try at supporting rain graph

This commit is contained in:
Jules 2024-01-01 21:49:18 +01:00
parent 27fab148b6
commit 3915d9ff23
Signed by: jdejaegh
GPG key ID: 99D6D184CA66933A
10 changed files with 374 additions and 79 deletions

View file

@ -33,6 +33,7 @@ class IrmKmiRadar(CoordinatorEntity, Camera):
"""Initialize IrmKmiRadar component.""" """Initialize IrmKmiRadar component."""
super().__init__(coordinator) super().__init__(coordinator)
Camera.__init__(self) Camera.__init__(self)
self.content_type = 'image/svg+xml'
self._name = f"Radar {entry.title}" self._name = f"Radar {entry.title}"
self._attr_unique_id = entry.entry_id self._attr_unique_id = entry.entry_id
self._attr_device_info = DeviceInfo( self._attr_device_info = DeviceInfo(
@ -42,18 +43,19 @@ class IrmKmiRadar(CoordinatorEntity, Camera):
name=f"Radar {entry.title}" name=f"Radar {entry.title}"
) )
self._image_index = 0 self._image_index = False
@property @property
def frame_interval(self) -> float: def frame_interval(self) -> float:
"""Return the interval between frames of the mjpeg stream.""" """Return the interval between frames of the mjpeg stream."""
return 0.3 return 20
def camera_image(self, def camera_image(self,
width: int | None = None, width: int | None = None,
height: int | None = None) -> bytes | None: height: int | None = None) -> bytes | None:
"""Return still image to be used as thumbnail.""" """Return still image to be used as thumbnail."""
return self.coordinator.data.get('animation', {}).get('most_recent_image') # TODO make it a still image to avoid cuts in playback on the dashboard
return self.coordinator.data.get('animation', {}).get('svg').encode()
async def async_camera_image( async def async_camera_image(
self, self,
@ -74,11 +76,11 @@ class IrmKmiRadar(CoordinatorEntity, Camera):
async def iterate(self) -> bytes | None: async def iterate(self) -> bytes | None:
"""Loop over all the frames when called multiple times.""" """Loop over all the frames when called multiple times."""
sequence = self.coordinator.data.get('animation', {}).get('sequence') # If this is not done this way, the live view can only be opened once
if isinstance(sequence, list) and len(sequence) > 0: self._image_index = not self._image_index
r = sequence[self._image_index].get('image', None) if self._image_index:
self._image_index = (self._image_index + 1) % len(sequence) return self.camera_image()
return r else:
return None return None
@property @property

View file

@ -2,12 +2,10 @@
import asyncio import asyncio
import logging import logging
from datetime import datetime, timedelta from datetime import datetime, timedelta
from io import BytesIO
from typing import Any, List, Tuple from typing import Any, List, Tuple
import async_timeout import async_timeout
import pytz import pytz
from PIL import Image, ImageDraw, ImageFont
from homeassistant.components.weather import Forecast from homeassistant.components.weather import Forecast
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.const import ATTR_LATITUDE, ATTR_LONGITUDE, CONF_ZONE from homeassistant.const import ATTR_LATITUDE, ATTR_LONGITUDE, CONF_ZONE
@ -18,11 +16,13 @@ from homeassistant.helpers.update_coordinator import (DataUpdateCoordinator,
UpdateFailed) UpdateFailed)
from .api import IrmKmiApiClient, IrmKmiApiError from .api import IrmKmiApiClient, IrmKmiApiError
from .const import CONF_DARK_MODE, CONF_STYLE, OPTION_STYLE_SATELLITE from .const import CONF_DARK_MODE, CONF_STYLE
from .const import IRM_KMI_TO_HA_CONDITION_MAP as CDT_MAP from .const import IRM_KMI_TO_HA_CONDITION_MAP as CDT_MAP
from .const import LANGS, OUT_OF_BENELUX, STYLE_TO_PARAM_MAP from .const import (LANGS, OPTION_STYLE_SATELLITE, OUT_OF_BENELUX,
STYLE_TO_PARAM_MAP)
from .data import (AnimationFrameData, CurrentWeatherData, IrmKmiForecast, from .data import (AnimationFrameData, CurrentWeatherData, IrmKmiForecast,
ProcessedCoordinatorData, RadarAnimationData) ProcessedCoordinatorData, RadarAnimationData)
from .rain_graph import RainGraph
from .utils import disable_from_config, get_config_value from .utils import disable_from_config, get_config_value
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -104,13 +104,17 @@ class IrmKmiCoordinator(DataUpdateCoordinator):
_LOGGER.warning(f"Could not get images for weather radar") _LOGGER.warning(f"Could not get images for weather radar")
return RadarAnimationData() return RadarAnimationData()
localisation = Image.open(BytesIO(images_from_api[0])).convert('RGBA') localisation = images_from_api[0]
images_from_api = images_from_api[1:] images_from_api = images_from_api[1:]
radar_animation = await self.merge_frames_from_api(animation_data, country, images_from_api, localisation)
lang = self.hass.config.language if self.hass.config.language in LANGS else 'en' lang = self.hass.config.language if self.hass.config.language in LANGS else 'en'
radar_animation['hint'] = api_data.get('animation', {}).get('sequenceHint', {}).get(lang) radar_animation = RadarAnimationData(
hint=api_data.get('animation', {}).get('sequenceHint', {}).get(lang),
unit=api_data.get('animation', {}).get('unit', {}).get(lang),
location=localisation
)
svg_str = self.create_rain_graph(radar_animation, animation_data, country, images_from_api)
radar_animation['svg'] = svg_str
return radar_animation return radar_animation
async def process_api_data(self, api_data: dict) -> ProcessedCoordinatorData: async def process_api_data(self, api_data: dict) -> ProcessedCoordinatorData:
@ -142,67 +146,6 @@ class IrmKmiCoordinator(DataUpdateCoordinator):
_LOGGER.debug(f"Just downloaded {len(images_from_api)} images") _LOGGER.debug(f"Just downloaded {len(images_from_api)} images")
return images_from_api return images_from_api
async def merge_frames_from_api(self,
animation_data: List[dict],
country: str,
images_from_api: Tuple[bytes],
localisation_layer: Image
) -> RadarAnimationData:
"""Merge three layers to create one frame of the radar: the basemap, the clouds and the location marker.
Adds text in the top right to specify the timestamp of each image."""
background: Image
fill_color: tuple
satellite_mode = self._style == OPTION_STYLE_SATELLITE
if country == 'NL':
background = Image.open("custom_components/irm_kmi/resources/nl.png").convert('RGBA')
fill_color = (0, 0, 0)
else:
image_path = (f"custom_components/irm_kmi/resources/be_"
f"{'satellite' if satellite_mode else 'black' if self._dark_mode else 'white'}.png")
background = (Image.open(image_path).convert('RGBA'))
fill_color = (255, 255, 255) if self._dark_mode or satellite_mode else (0, 0, 0)
most_recent_frame = None
tz = pytz.timezone(self.hass.config.time_zone)
current_time = datetime.now(tz=tz)
sequence: List[AnimationFrameData] = list()
for (idx, sequence_element) in enumerate(animation_data):
frame = images_from_api[idx]
layer = Image.open(BytesIO(frame)).convert('RGBA')
temp = Image.alpha_composite(background, layer)
temp = Image.alpha_composite(temp, localisation_layer)
draw = ImageDraw.Draw(temp)
font = ImageFont.truetype("custom_components/irm_kmi/resources/roboto_medium.ttf", 16)
time_image = (datetime.fromisoformat(sequence_element.get('time'))
.astimezone(tz=tz))
time_str = time_image.isoformat(sep=' ', timespec='minutes')
draw.text((4, 4), time_str, fill_color, font=font)
bytes_img = BytesIO()
temp.save(bytes_img, 'png', compress_level=8)
sequence.append(
AnimationFrameData(
time=time_image,
image=bytes_img.getvalue()
)
)
if most_recent_frame is None and current_time < time_image:
most_recent_frame = idx - 1 if idx > 0 else idx
background.close()
most_recent_frame = most_recent_frame if most_recent_frame is not None else -1
return RadarAnimationData(
sequence=sequence,
most_recent_image_idx=most_recent_frame
)
@staticmethod @staticmethod
def current_weather_from_data(api_data: dict) -> CurrentWeatherData: def current_weather_from_data(api_data: dict) -> CurrentWeatherData:
@ -351,3 +294,50 @@ class IrmKmiCoordinator(DataUpdateCoordinator):
n_days += 1 n_days += 1
return forecasts return forecasts
def create_rain_graph(self,
radar_animation: RadarAnimationData,
api_animation_data: List[dict],
country: str,
images_from_api: Tuple[bytes],
) -> str:
sequence: List[AnimationFrameData] = list()
tz = pytz.timezone(self.hass.config.time_zone)
current_time = datetime.now(tz=tz)
most_recent_frame = None
for idx, item in enumerate(api_animation_data):
frame = AnimationFrameData(
image=images_from_api[idx],
time=datetime.fromisoformat(item.get('time')) if item.get('time', None) is not None else None,
value=item.get('value', 0),
position=item.get('position', 0),
position_lower=item.get('positionLower', 0),
position_higher=item.get('positionHigher', 0)
)
sequence.append(frame)
if most_recent_frame is None and current_time < frame['time']:
most_recent_frame = idx - 1 if idx > 0 else idx
radar_animation['sequence'] = sequence
radar_animation['most_recent_image_idx'] = most_recent_frame
satellite_mode = self._style == OPTION_STYLE_SATELLITE
if country == 'NL':
image_path = "custom_components/irm_kmi/resources/nl.png"
bg_size = (640, 600)
else:
image_path = (f"custom_components/irm_kmi/resources/be_"
f"{'satellite' if satellite_mode else 'black' if self._dark_mode else 'white'}.png")
bg_size = (640, 490)
svg_str = RainGraph(radar_animation, image_path, bg_size,
dark_mode=self._dark_mode,
# tz=self.hass.config.time_zone
).get_svg_string()
# TODO return value
return svg_str

View file

@ -24,6 +24,7 @@ class CurrentWeatherData(TypedDict, total=False):
pressure: float | None pressure: float | None
# TODO cleanup useless fields
class AnimationFrameData(TypedDict, total=False): class AnimationFrameData(TypedDict, total=False):
"""Holds one single frame of the radar camera, along with the timestamp of the frame""" """Holds one single frame of the radar camera, along with the timestamp of the frame"""
time: datetime | None time: datetime | None
@ -33,6 +34,7 @@ class AnimationFrameData(TypedDict, total=False):
position_higher: float | None position_higher: float | None
position_lower: float | None position_lower: float | None
rain_graph: bytes | None rain_graph: bytes | None
merged_image: bytes | None
class RadarAnimationData(TypedDict, total=False): class RadarAnimationData(TypedDict, total=False):
@ -41,6 +43,8 @@ class RadarAnimationData(TypedDict, total=False):
most_recent_image_idx: int | None most_recent_image_idx: int | None
hint: str | None hint: str | None
unit: str | None unit: str | None
location: bytes | None
svg: str | None
class ProcessedCoordinatorData(TypedDict, total=False): class ProcessedCoordinatorData(TypedDict, total=False):

View file

@ -8,6 +8,10 @@
"integration_type": "service", "integration_type": "service",
"iot_class": "cloud_polling", "iot_class": "cloud_polling",
"issue_tracker": "https://github.com/jdejaegh/irm-kmi-ha/issues", "issue_tracker": "https://github.com/jdejaegh/irm-kmi-ha/issues",
"requirements": [], "requirements": [
"Pillow==10.1.0",
"pytz==2023.3.post1",
"svgwrite==1.4.3"
],
"version": "0.1.6-beta" "version": "0.1.6-beta"
} }

View file

@ -0,0 +1,292 @@
"""Create graphs for rain short term forecast."""
import base64
from typing import List
import pytz
from svgwrite import Drawing
from svgwrite.animate import Animate
from custom_components.irm_kmi.data import (AnimationFrameData,
RadarAnimationData)
class RainGraph:
def __init__(self,
animation_data: RadarAnimationData,
background_image_path: str,
background_size: (int, int),
dark_mode: bool = False,
tz: str = 'UTC',
svg_width: float = 640,
inset: float = 20,
graph_height: float = 150,
top_text_space: float = 30,
top_text_y_pos: float = 20,
bottom_text_space: float = 45,
bottom_text_y_pos: float = 215
):
self._animation_data: RadarAnimationData = animation_data
self._background_image_path: str = background_image_path
self._background_size: (int, int) = background_size
self._dark_mode: bool = dark_mode
self._tz = pytz.timezone(tz)
self._svg_width: float = svg_width
self._inset: float = inset
self._graph_height: float = graph_height
self._top_text_space: float = top_text_space + background_size[1]
self._top_text_y_pos: float = top_text_y_pos + background_size[1]
self._bottom_text_space: float = bottom_text_space
self._bottom_text_y_pos: float = bottom_text_y_pos + background_size[1]
self._frame_count: int = len(self._animation_data['sequence'])
self._graph_width: float = self._svg_width - 2 * self._inset
self._graph_bottom: float = self._top_text_space + self._graph_height
self._svg_height: float = self._graph_height + self._top_text_space + self._bottom_text_space
self._interval_width: float = self._graph_width / self._frame_count
self._offset: float = self._inset + self._interval_width / 2
if not (0 <= self._top_text_y_pos <= self._top_text_space):
raise ValueError("It must hold that 0 <= top_text_y_pos <= top_text_space")
if not (self._graph_bottom <= self._bottom_text_y_pos <= self._graph_bottom + self._bottom_text_space):
raise ValueError("bottom_text_y_pos must be below the graph")
self._dwg: Drawing = Drawing(size=(self._svg_width, self._svg_height), profile='full')
self.draw_svg_graph()
self.draw_current_fame_line()
self.draw_description_text()
self.insert_background()
self.insert_cloud_layer()
self.draw_location()
def draw_svg_graph(self):
"""Create the global area to draw the other items"""
self._dwg.embed_font(name="Roboto Medium", filename='custom_components/irm_kmi/resources/roboto_medium.ttf')
self._dwg.embed_stylesheet("""
.roboto {
font-family: "Roboto Medium";
}
""")
fill_color = '#393C40' if self._dark_mode else '#385E95'
self._dwg.add(self._dwg.rect(insert=(0, 0),
size=(self._svg_width, self._svg_height),
rx=None, ry=None,
fill=fill_color, stroke='none'))
self.draw_hour_bars()
self.draw_chances_path()
self.draw_data_line()
self.write_hint()
def draw_description_text(self):
"""For the given frame idx, write the amount of precipitation and the time at the top of the graph"""
times = [e['time'].astimezone(tz=self._tz).isoformat(sep=' ', timespec='minutes') for e in
self._animation_data['sequence']]
rain_levels = [f"{e['value']}{self._animation_data['unit']}" for e in self._animation_data['sequence']]
for i in range(self._frame_count):
time = times[i]
rain_level = rain_levels[i]
paragraph = self._dwg.add(self._dwg.g(class_="roboto", ))
values = ['hidden'] * self._frame_count
values[i] = 'visible'
paragraph.add(Animate(
attributeName="visibility",
values=";".join(values),
dur=f"{self._frame_count * 0.3}s",
begin="0s",
repeatCount="indefinite"
))
paragraph.add(self._dwg.text(f"{time}", insert=(self._offset, self._top_text_y_pos),
text_anchor="start",
font_size="14px",
fill="white",
stroke='none'))
paragraph.add(self._dwg.text(f"{rain_level}", insert=(self._svg_width / 2, self._top_text_y_pos),
text_anchor="middle",
font_size="14px",
fill="white",
stroke='none'))
def write_hint(self):
"""Add the hint text at the bottom of the graph"""
paragraph = self._dwg.add(self._dwg.g(class_="roboto", ))
hint = self._animation_data['hint']
paragraph.add(self._dwg.text(f"{hint}", insert=(self._svg_width / 2, self._bottom_text_y_pos),
text_anchor="middle",
font_size="14px",
fill="white",
stroke='none'))
def draw_chances_path(self):
"""Draw the prevision margin area around the main forecast line"""
list_lower_points = []
list_higher_points = []
rain_list: List[AnimationFrameData] = self._animation_data['sequence']
graph_rect_left = self._offset
graph_rect_top = self._top_text_space
for i in range(len(rain_list)):
position_higher = rain_list[i]['position_higher']
if position_higher is not None:
list_higher_points.append((graph_rect_left, graph_rect_top + (
1.0 - position_higher) * self._graph_height))
graph_rect_left += self._interval_width
graph_rect_right = graph_rect_left - self._interval_width
for i in range(len(rain_list) - 1, -1, -1):
position_lower = rain_list[i]['position_lower']
if position_lower is not None:
list_lower_points.append((graph_rect_right, graph_rect_top + (
1.0 - position_lower) * self._graph_height))
graph_rect_right -= self._interval_width
if list_higher_points and list_lower_points:
self.draw_chance_precip(list_higher_points, list_lower_points)
def draw_chance_precip(self, list_higher_points: List, list_lower_points: List):
precip_higher_chance_path = self._dwg.path(fill='#63c8fa', stroke='none', opacity=.3)
list_higher_points[-1] = tuple(list(list_higher_points[-1]) + ['last'])
self.set_curved_path(precip_higher_chance_path, list_higher_points + list_lower_points)
self._dwg.add(precip_higher_chance_path)
@staticmethod
def set_curved_path(path, points):
"""Pushes points on the path by creating a nice curve between them"""
if len(points) < 2:
return
path.push('M', *points[0])
for i in range(1, len(points)):
x_mid = (points[i - 1][0] + points[i][0]) / 2
y_mid = (points[i - 1][1] + points[i][1]) / 2
path.push('Q', points[i - 1][0], points[i - 1][1], x_mid, y_mid)
if points[i][-1] == 'last' or points[i - 1][-1] == 'last':
path.push('Q', points[i][0], points[i][1], points[i][0], points[i][1])
path.push('Q', points[-1][0], points[-1][1], points[-1][0], points[-1][1])
def draw_data_line(self):
"""Draw the main data line for the rain forecast"""
rain_list: List[AnimationFrameData] = self._animation_data['sequence']
graph_rect_left = self._offset
graph_rect_top = self._top_text_space
entry_list = []
for i in range(len(rain_list)):
position = rain_list[i]['position']
entry_list.append(
(graph_rect_left,
graph_rect_top + (1.0 - position) * self._graph_height))
graph_rect_left += self._interval_width
data_line_path = self._dwg.path(fill='none', stroke='#63c8fa', stroke_width=2)
self.set_curved_path(data_line_path, entry_list)
self._dwg.add(data_line_path)
def draw_hour_bars(self):
"""Draw the small bars at the bottom to represent the time"""
hour_bar_height = 8
horizontal_inset = self._offset
for (i, rain_item) in enumerate(self._animation_data['sequence']):
time_image = rain_item['time'].astimezone(tz=self._tz)
is_hour_bar = time_image.minute == 0
x_position = horizontal_inset
if i == self._animation_data['most_recent_image_idx']:
self._dwg.add(self._dwg.line(start=(x_position, self._top_text_space),
end=(x_position, self._graph_bottom),
stroke='white',
opacity=0.5,
stroke_dasharray=4))
self._dwg.add(self._dwg.line(start=(x_position, self._graph_bottom - hour_bar_height),
end=(x_position, self._graph_bottom),
stroke='white' if is_hour_bar else 'lightgrey',
opacity=0.9 if is_hour_bar else 0.7))
if is_hour_bar:
graph_rect_center_x = x_position
graph_rect_center_y = self._graph_bottom + 15
paragraph = self._dwg.add(self._dwg.g(class_="roboto", ))
paragraph.add(self._dwg.text(f"{time_image.hour}h", insert=(graph_rect_center_x, graph_rect_center_y),
text_anchor="middle",
font_size="14px",
fill="white",
stroke='none'))
horizontal_inset += self._interval_width
self._dwg.add(self._dwg.line(start=(self._offset, self._graph_bottom),
end=(self._graph_width + self._interval_width / 2, self._graph_bottom),
stroke='white'))
def draw_current_fame_line(self):
"""Draw a solid white line on the timeline at the position of the given frame index"""
x_position = self._offset
now = self._dwg.add(self._dwg.line(start=(x_position, self._top_text_space),
end=(x_position, self._graph_bottom),
id='now',
stroke='white',
opacity=1,
stroke_width=2))
now.add(self._dwg.animateTransform("translate", "transform",
id="now",
from_=f"{self._offset} 0",
to=f"{self._graph_width - self._offset} 0",
dur=f"{self._frame_count * 0.3}s",
repeatCount="indefinite"))
def get_svg_string(self):
return self._dwg.tostring()
def insert_background(self):
with open(self._background_image_path, 'rb') as f:
png_data = base64.b64encode(f.read()).decode('utf-8')
image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size)
self._dwg.add(image)
def insert_cloud_layer(self):
imgs = [e['image'] for e in self._animation_data['sequence']]
for i, img in enumerate(imgs):
png_data = base64.b64encode(img).decode('utf-8')
image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size)
self._dwg.add(image)
values = ['hidden'] * self._frame_count
values[i] = 'visible'
image.add(Animate(
attributeName="visibility",
values=";".join(values),
dur=f"{self._frame_count * 0.3}s",
begin="0s",
repeatCount="indefinite"
))
def draw_location(self):
img = self._animation_data['location']
png_data = base64.b64encode(img).decode('utf-8')
image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size)
self._dwg.add(image)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 90 KiB

After

Width:  |  Height:  |  Size: 68 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 667 KiB

After

Width:  |  Height:  |  Size: 666 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 50 KiB

After

Width:  |  Height:  |  Size: 38 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 74 KiB

After

Width:  |  Height:  |  Size: 60 KiB

View file

@ -3,6 +3,7 @@ from datetime import datetime
from io import BytesIO from io import BytesIO
from unittest.mock import AsyncMock from unittest.mock import AsyncMock
import pytest
import pytz import pytz
from freezegun import freeze_time from freezegun import freeze_time
from homeassistant.components.weather import (ATTR_CONDITION_CLOUDY, from homeassistant.components.weather import (ATTR_CONDITION_CLOUDY,
@ -109,6 +110,7 @@ def test_hourly_forecast() -> None:
@freeze_time(datetime.fromisoformat("2023-12-28T15:30:00+01:00")) @freeze_time(datetime.fromisoformat("2023-12-28T15:30:00+01:00"))
@pytest.mark.skip(reason="Outdated test, cannot be tested this way with the new camera features")
async def test_get_image_nl( async def test_get_image_nl(
hass: HomeAssistant, hass: HomeAssistant,
mock_image_irm_kmi_api: AsyncMock, mock_image_irm_kmi_api: AsyncMock,
@ -143,6 +145,7 @@ async def test_get_image_nl(
@freeze_time(datetime.fromisoformat("2023-12-26T18:31:00+01:00")) @freeze_time(datetime.fromisoformat("2023-12-26T18:31:00+01:00"))
@pytest.mark.skip(reason="Outdated test, cannot be tested this way with the new camera features")
async def test_get_image_be( async def test_get_image_be(
hass: HomeAssistant, hass: HomeAssistant,
mock_image_irm_kmi_api: AsyncMock, mock_image_irm_kmi_api: AsyncMock,