diff --git a/custom_components/irm_kmi/camera.py b/custom_components/irm_kmi/camera.py index 6ee3ac5..14c7ddd 100644 --- a/custom_components/irm_kmi/camera.py +++ b/custom_components/irm_kmi/camera.py @@ -33,6 +33,7 @@ class IrmKmiRadar(CoordinatorEntity, Camera): """Initialize IrmKmiRadar component.""" super().__init__(coordinator) Camera.__init__(self) + self.content_type = 'image/svg+xml' self._name = f"Radar {entry.title}" self._attr_unique_id = entry.entry_id self._attr_device_info = DeviceInfo( @@ -42,18 +43,19 @@ class IrmKmiRadar(CoordinatorEntity, Camera): name=f"Radar {entry.title}" ) - self._image_index = 0 + self._image_index = False @property def frame_interval(self) -> float: """Return the interval between frames of the mjpeg stream.""" - return 0.3 + return 20 def camera_image(self, width: int | None = None, height: int | None = None) -> bytes | None: """Return still image to be used as thumbnail.""" - return self.coordinator.data.get('animation', {}).get('most_recent_image') + # TODO make it a still image to avoid cuts in playback on the dashboard + return self.coordinator.data.get('animation', {}).get('svg').encode() async def async_camera_image( self, @@ -74,12 +76,12 @@ class IrmKmiRadar(CoordinatorEntity, Camera): async def iterate(self) -> bytes | None: """Loop over all the frames when called multiple times.""" - sequence = self.coordinator.data.get('animation', {}).get('sequence') - if isinstance(sequence, list) and len(sequence) > 0: - r = sequence[self._image_index].get('image', None) - self._image_index = (self._image_index + 1) % len(sequence) - return r - return None + # If this is not done this way, the live view can only be opened once + self._image_index = not self._image_index + if self._image_index: + return self.camera_image() + else: + return None @property def name(self) -> str: diff --git a/custom_components/irm_kmi/coordinator.py b/custom_components/irm_kmi/coordinator.py index 9459db3..5075ee0 100644 --- a/custom_components/irm_kmi/coordinator.py +++ b/custom_components/irm_kmi/coordinator.py @@ -2,12 +2,10 @@ import asyncio import logging from datetime import datetime, timedelta -from io import BytesIO from typing import Any, List, Tuple import async_timeout import pytz -from PIL import Image, ImageDraw, ImageFont from homeassistant.components.weather import Forecast from homeassistant.config_entries import ConfigEntry from homeassistant.const import ATTR_LATITUDE, ATTR_LONGITUDE, CONF_ZONE @@ -18,11 +16,13 @@ from homeassistant.helpers.update_coordinator import (DataUpdateCoordinator, UpdateFailed) from .api import IrmKmiApiClient, IrmKmiApiError -from .const import CONF_DARK_MODE, CONF_STYLE, OPTION_STYLE_SATELLITE +from .const import CONF_DARK_MODE, CONF_STYLE from .const import IRM_KMI_TO_HA_CONDITION_MAP as CDT_MAP -from .const import LANGS, OUT_OF_BENELUX, STYLE_TO_PARAM_MAP +from .const import (LANGS, OPTION_STYLE_SATELLITE, OUT_OF_BENELUX, + STYLE_TO_PARAM_MAP) from .data import (AnimationFrameData, CurrentWeatherData, IrmKmiForecast, ProcessedCoordinatorData, RadarAnimationData) +from .rain_graph import RainGraph from .utils import disable_from_config, get_config_value _LOGGER = logging.getLogger(__name__) @@ -104,13 +104,17 @@ class IrmKmiCoordinator(DataUpdateCoordinator): _LOGGER.warning(f"Could not get images for weather radar") return RadarAnimationData() - localisation = Image.open(BytesIO(images_from_api[0])).convert('RGBA') + localisation = images_from_api[0] images_from_api = images_from_api[1:] - radar_animation = await self.merge_frames_from_api(animation_data, country, images_from_api, localisation) - lang = self.hass.config.language if self.hass.config.language in LANGS else 'en' - radar_animation['hint'] = api_data.get('animation', {}).get('sequenceHint', {}).get(lang) + radar_animation = RadarAnimationData( + hint=api_data.get('animation', {}).get('sequenceHint', {}).get(lang), + unit=api_data.get('animation', {}).get('unit', {}).get(lang), + location=localisation + ) + svg_str = self.create_rain_graph(radar_animation, animation_data, country, images_from_api) + radar_animation['svg'] = svg_str return radar_animation async def process_api_data(self, api_data: dict) -> ProcessedCoordinatorData: @@ -142,67 +146,6 @@ class IrmKmiCoordinator(DataUpdateCoordinator): _LOGGER.debug(f"Just downloaded {len(images_from_api)} images") return images_from_api - async def merge_frames_from_api(self, - animation_data: List[dict], - country: str, - images_from_api: Tuple[bytes], - localisation_layer: Image - ) -> RadarAnimationData: - """Merge three layers to create one frame of the radar: the basemap, the clouds and the location marker. - Adds text in the top right to specify the timestamp of each image.""" - background: Image - fill_color: tuple - satellite_mode = self._style == OPTION_STYLE_SATELLITE - - if country == 'NL': - background = Image.open("custom_components/irm_kmi/resources/nl.png").convert('RGBA') - fill_color = (0, 0, 0) - else: - image_path = (f"custom_components/irm_kmi/resources/be_" - f"{'satellite' if satellite_mode else 'black' if self._dark_mode else 'white'}.png") - background = (Image.open(image_path).convert('RGBA')) - fill_color = (255, 255, 255) if self._dark_mode or satellite_mode else (0, 0, 0) - - most_recent_frame = None - tz = pytz.timezone(self.hass.config.time_zone) - current_time = datetime.now(tz=tz) - sequence: List[AnimationFrameData] = list() - - for (idx, sequence_element) in enumerate(animation_data): - frame = images_from_api[idx] - layer = Image.open(BytesIO(frame)).convert('RGBA') - temp = Image.alpha_composite(background, layer) - temp = Image.alpha_composite(temp, localisation_layer) - - draw = ImageDraw.Draw(temp) - font = ImageFont.truetype("custom_components/irm_kmi/resources/roboto_medium.ttf", 16) - time_image = (datetime.fromisoformat(sequence_element.get('time')) - .astimezone(tz=tz)) - - time_str = time_image.isoformat(sep=' ', timespec='minutes') - - draw.text((4, 4), time_str, fill_color, font=font) - - bytes_img = BytesIO() - temp.save(bytes_img, 'png', compress_level=8) - - sequence.append( - AnimationFrameData( - time=time_image, - image=bytes_img.getvalue() - ) - ) - - if most_recent_frame is None and current_time < time_image: - most_recent_frame = idx - 1 if idx > 0 else idx - - background.close() - most_recent_frame = most_recent_frame if most_recent_frame is not None else -1 - - return RadarAnimationData( - sequence=sequence, - most_recent_image_idx=most_recent_frame - ) @staticmethod def current_weather_from_data(api_data: dict) -> CurrentWeatherData: @@ -351,3 +294,50 @@ class IrmKmiCoordinator(DataUpdateCoordinator): n_days += 1 return forecasts + + def create_rain_graph(self, + radar_animation: RadarAnimationData, + api_animation_data: List[dict], + country: str, + images_from_api: Tuple[bytes], + ) -> str: + + sequence: List[AnimationFrameData] = list() + tz = pytz.timezone(self.hass.config.time_zone) + current_time = datetime.now(tz=tz) + most_recent_frame = None + + for idx, item in enumerate(api_animation_data): + frame = AnimationFrameData( + image=images_from_api[idx], + time=datetime.fromisoformat(item.get('time')) if item.get('time', None) is not None else None, + value=item.get('value', 0), + position=item.get('position', 0), + position_lower=item.get('positionLower', 0), + position_higher=item.get('positionHigher', 0) + ) + sequence.append(frame) + + if most_recent_frame is None and current_time < frame['time']: + most_recent_frame = idx - 1 if idx > 0 else idx + + radar_animation['sequence'] = sequence + radar_animation['most_recent_image_idx'] = most_recent_frame + + satellite_mode = self._style == OPTION_STYLE_SATELLITE + + if country == 'NL': + image_path = "custom_components/irm_kmi/resources/nl.png" + bg_size = (640, 600) + else: + image_path = (f"custom_components/irm_kmi/resources/be_" + f"{'satellite' if satellite_mode else 'black' if self._dark_mode else 'white'}.png") + bg_size = (640, 490) + + svg_str = RainGraph(radar_animation, image_path, bg_size, + dark_mode=self._dark_mode, + # tz=self.hass.config.time_zone + ).get_svg_string() + + # TODO return value + return svg_str diff --git a/custom_components/irm_kmi/data.py b/custom_components/irm_kmi/data.py index 3f3978e..d827b5c 100644 --- a/custom_components/irm_kmi/data.py +++ b/custom_components/irm_kmi/data.py @@ -24,6 +24,7 @@ class CurrentWeatherData(TypedDict, total=False): pressure: float | None +# TODO cleanup useless fields class AnimationFrameData(TypedDict, total=False): """Holds one single frame of the radar camera, along with the timestamp of the frame""" time: datetime | None @@ -33,6 +34,7 @@ class AnimationFrameData(TypedDict, total=False): position_higher: float | None position_lower: float | None rain_graph: bytes | None + merged_image: bytes | None class RadarAnimationData(TypedDict, total=False): @@ -41,6 +43,8 @@ class RadarAnimationData(TypedDict, total=False): most_recent_image_idx: int | None hint: str | None unit: str | None + location: bytes | None + svg: str | None class ProcessedCoordinatorData(TypedDict, total=False): diff --git a/custom_components/irm_kmi/manifest.json b/custom_components/irm_kmi/manifest.json index 309be7d..dfa3668 100644 --- a/custom_components/irm_kmi/manifest.json +++ b/custom_components/irm_kmi/manifest.json @@ -8,6 +8,10 @@ "integration_type": "service", "iot_class": "cloud_polling", "issue_tracker": "https://github.com/jdejaegh/irm-kmi-ha/issues", - "requirements": [], + "requirements": [ + "Pillow==10.1.0", + "pytz==2023.3.post1", + "svgwrite==1.4.3" + ], "version": "0.1.6-beta" } \ No newline at end of file diff --git a/custom_components/irm_kmi/rain_graph.py b/custom_components/irm_kmi/rain_graph.py new file mode 100644 index 0000000..3f0ba07 --- /dev/null +++ b/custom_components/irm_kmi/rain_graph.py @@ -0,0 +1,292 @@ +"""Create graphs for rain short term forecast.""" + +import base64 +from typing import List + +import pytz +from svgwrite import Drawing +from svgwrite.animate import Animate + +from custom_components.irm_kmi.data import (AnimationFrameData, + RadarAnimationData) + + +class RainGraph: + def __init__(self, + animation_data: RadarAnimationData, + background_image_path: str, + background_size: (int, int), + dark_mode: bool = False, + tz: str = 'UTC', + svg_width: float = 640, + inset: float = 20, + graph_height: float = 150, + top_text_space: float = 30, + top_text_y_pos: float = 20, + bottom_text_space: float = 45, + bottom_text_y_pos: float = 215 + ): + + self._animation_data: RadarAnimationData = animation_data + self._background_image_path: str = background_image_path + self._background_size: (int, int) = background_size + self._dark_mode: bool = dark_mode + self._tz = pytz.timezone(tz) + self._svg_width: float = svg_width + self._inset: float = inset + self._graph_height: float = graph_height + self._top_text_space: float = top_text_space + background_size[1] + self._top_text_y_pos: float = top_text_y_pos + background_size[1] + self._bottom_text_space: float = bottom_text_space + self._bottom_text_y_pos: float = bottom_text_y_pos + background_size[1] + + self._frame_count: int = len(self._animation_data['sequence']) + self._graph_width: float = self._svg_width - 2 * self._inset + self._graph_bottom: float = self._top_text_space + self._graph_height + self._svg_height: float = self._graph_height + self._top_text_space + self._bottom_text_space + self._interval_width: float = self._graph_width / self._frame_count + self._offset: float = self._inset + self._interval_width / 2 + + if not (0 <= self._top_text_y_pos <= self._top_text_space): + raise ValueError("It must hold that 0 <= top_text_y_pos <= top_text_space") + + if not (self._graph_bottom <= self._bottom_text_y_pos <= self._graph_bottom + self._bottom_text_space): + raise ValueError("bottom_text_y_pos must be below the graph") + + self._dwg: Drawing = Drawing(size=(self._svg_width, self._svg_height), profile='full') + + self.draw_svg_graph() + self.draw_current_fame_line() + + self.draw_description_text() + self.insert_background() + self.insert_cloud_layer() + self.draw_location() + + def draw_svg_graph(self): + """Create the global area to draw the other items""" + self._dwg.embed_font(name="Roboto Medium", filename='custom_components/irm_kmi/resources/roboto_medium.ttf') + self._dwg.embed_stylesheet(""" + .roboto { + font-family: "Roboto Medium"; + } + """) + + fill_color = '#393C40' if self._dark_mode else '#385E95' + self._dwg.add(self._dwg.rect(insert=(0, 0), + size=(self._svg_width, self._svg_height), + rx=None, ry=None, + fill=fill_color, stroke='none')) + + self.draw_hour_bars() + self.draw_chances_path() + self.draw_data_line() + self.write_hint() + + def draw_description_text(self): + """For the given frame idx, write the amount of precipitation and the time at the top of the graph""" + + times = [e['time'].astimezone(tz=self._tz).isoformat(sep=' ', timespec='minutes') for e in + self._animation_data['sequence']] + rain_levels = [f"{e['value']}{self._animation_data['unit']}" for e in self._animation_data['sequence']] + + for i in range(self._frame_count): + time = times[i] + rain_level = rain_levels[i] + + paragraph = self._dwg.add(self._dwg.g(class_="roboto", )) + + values = ['hidden'] * self._frame_count + values[i] = 'visible' + + paragraph.add(Animate( + attributeName="visibility", + values=";".join(values), + dur=f"{self._frame_count * 0.3}s", + begin="0s", + repeatCount="indefinite" + )) + + paragraph.add(self._dwg.text(f"{time}", insert=(self._offset, self._top_text_y_pos), + text_anchor="start", + font_size="14px", + fill="white", + stroke='none')) + + paragraph.add(self._dwg.text(f"{rain_level}", insert=(self._svg_width / 2, self._top_text_y_pos), + text_anchor="middle", + font_size="14px", + fill="white", + stroke='none')) + + def write_hint(self): + """Add the hint text at the bottom of the graph""" + paragraph = self._dwg.add(self._dwg.g(class_="roboto", )) + + hint = self._animation_data['hint'] + + paragraph.add(self._dwg.text(f"{hint}", insert=(self._svg_width / 2, self._bottom_text_y_pos), + text_anchor="middle", + font_size="14px", + fill="white", + stroke='none')) + + def draw_chances_path(self): + """Draw the prevision margin area around the main forecast line""" + list_lower_points = [] + list_higher_points = [] + + rain_list: List[AnimationFrameData] = self._animation_data['sequence'] + graph_rect_left = self._offset + graph_rect_top = self._top_text_space + + for i in range(len(rain_list)): + position_higher = rain_list[i]['position_higher'] + if position_higher is not None: + list_higher_points.append((graph_rect_left, graph_rect_top + ( + 1.0 - position_higher) * self._graph_height)) + graph_rect_left += self._interval_width + + graph_rect_right = graph_rect_left - self._interval_width + for i in range(len(rain_list) - 1, -1, -1): + position_lower = rain_list[i]['position_lower'] + if position_lower is not None: + list_lower_points.append((graph_rect_right, graph_rect_top + ( + 1.0 - position_lower) * self._graph_height)) + graph_rect_right -= self._interval_width + + if list_higher_points and list_lower_points: + self.draw_chance_precip(list_higher_points, list_lower_points) + + def draw_chance_precip(self, list_higher_points: List, list_lower_points: List): + precip_higher_chance_path = self._dwg.path(fill='#63c8fa', stroke='none', opacity=.3) + + list_higher_points[-1] = tuple(list(list_higher_points[-1]) + ['last']) + + self.set_curved_path(precip_higher_chance_path, list_higher_points + list_lower_points) + self._dwg.add(precip_higher_chance_path) + + @staticmethod + def set_curved_path(path, points): + """Pushes points on the path by creating a nice curve between them""" + if len(points) < 2: + return + + path.push('M', *points[0]) + + for i in range(1, len(points)): + x_mid = (points[i - 1][0] + points[i][0]) / 2 + y_mid = (points[i - 1][1] + points[i][1]) / 2 + + path.push('Q', points[i - 1][0], points[i - 1][1], x_mid, y_mid) + if points[i][-1] == 'last' or points[i - 1][-1] == 'last': + path.push('Q', points[i][0], points[i][1], points[i][0], points[i][1]) + + path.push('Q', points[-1][0], points[-1][1], points[-1][0], points[-1][1]) + + def draw_data_line(self): + """Draw the main data line for the rain forecast""" + rain_list: List[AnimationFrameData] = self._animation_data['sequence'] + graph_rect_left = self._offset + graph_rect_top = self._top_text_space + + entry_list = [] + + for i in range(len(rain_list)): + position = rain_list[i]['position'] + entry_list.append( + (graph_rect_left, + graph_rect_top + (1.0 - position) * self._graph_height)) + graph_rect_left += self._interval_width + data_line_path = self._dwg.path(fill='none', stroke='#63c8fa', stroke_width=2) + self.set_curved_path(data_line_path, entry_list) + self._dwg.add(data_line_path) + + def draw_hour_bars(self): + """Draw the small bars at the bottom to represent the time""" + hour_bar_height = 8 + horizontal_inset = self._offset + + for (i, rain_item) in enumerate(self._animation_data['sequence']): + time_image = rain_item['time'].astimezone(tz=self._tz) + is_hour_bar = time_image.minute == 0 + + x_position = horizontal_inset + if i == self._animation_data['most_recent_image_idx']: + self._dwg.add(self._dwg.line(start=(x_position, self._top_text_space), + end=(x_position, self._graph_bottom), + stroke='white', + opacity=0.5, + stroke_dasharray=4)) + + self._dwg.add(self._dwg.line(start=(x_position, self._graph_bottom - hour_bar_height), + end=(x_position, self._graph_bottom), + stroke='white' if is_hour_bar else 'lightgrey', + opacity=0.9 if is_hour_bar else 0.7)) + + if is_hour_bar: + graph_rect_center_x = x_position + graph_rect_center_y = self._graph_bottom + 15 + + paragraph = self._dwg.add(self._dwg.g(class_="roboto", )) + paragraph.add(self._dwg.text(f"{time_image.hour}h", insert=(graph_rect_center_x, graph_rect_center_y), + text_anchor="middle", + font_size="14px", + fill="white", + stroke='none')) + + horizontal_inset += self._interval_width + + self._dwg.add(self._dwg.line(start=(self._offset, self._graph_bottom), + end=(self._graph_width + self._interval_width / 2, self._graph_bottom), + stroke='white')) + + def draw_current_fame_line(self): + """Draw a solid white line on the timeline at the position of the given frame index""" + x_position = self._offset + now = self._dwg.add(self._dwg.line(start=(x_position, self._top_text_space), + end=(x_position, self._graph_bottom), + id='now', + stroke='white', + opacity=1, + stroke_width=2)) + now.add(self._dwg.animateTransform("translate", "transform", + id="now", + from_=f"{self._offset} 0", + to=f"{self._graph_width - self._offset} 0", + dur=f"{self._frame_count * 0.3}s", + repeatCount="indefinite")) + + def get_svg_string(self): + return self._dwg.tostring() + + def insert_background(self): + with open(self._background_image_path, 'rb') as f: + png_data = base64.b64encode(f.read()).decode('utf-8') + image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size) + self._dwg.add(image) + + def insert_cloud_layer(self): + imgs = [e['image'] for e in self._animation_data['sequence']] + + for i, img in enumerate(imgs): + png_data = base64.b64encode(img).decode('utf-8') + image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size) + self._dwg.add(image) + + values = ['hidden'] * self._frame_count + values[i] = 'visible' + + image.add(Animate( + attributeName="visibility", + values=";".join(values), + dur=f"{self._frame_count * 0.3}s", + begin="0s", + repeatCount="indefinite" + )) + + def draw_location(self): + img = self._animation_data['location'] + png_data = base64.b64encode(img).decode('utf-8') + image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size) + self._dwg.add(image) diff --git a/custom_components/irm_kmi/resources/be_black.png b/custom_components/irm_kmi/resources/be_black.png index 84a4329..122bcbe 100644 Binary files a/custom_components/irm_kmi/resources/be_black.png and b/custom_components/irm_kmi/resources/be_black.png differ diff --git a/custom_components/irm_kmi/resources/be_satellite.png b/custom_components/irm_kmi/resources/be_satellite.png index 27c79b4..10f4e49 100644 Binary files a/custom_components/irm_kmi/resources/be_satellite.png and b/custom_components/irm_kmi/resources/be_satellite.png differ diff --git a/custom_components/irm_kmi/resources/be_white.png b/custom_components/irm_kmi/resources/be_white.png index 1685bb9..9339e4e 100644 Binary files a/custom_components/irm_kmi/resources/be_white.png and b/custom_components/irm_kmi/resources/be_white.png differ diff --git a/custom_components/irm_kmi/resources/nl.png b/custom_components/irm_kmi/resources/nl.png index 94a3ec0..c2177b6 100644 Binary files a/custom_components/irm_kmi/resources/nl.png and b/custom_components/irm_kmi/resources/nl.png differ diff --git a/tests/test_coordinator.py b/tests/test_coordinator.py index d2b5b6a..d185366 100644 --- a/tests/test_coordinator.py +++ b/tests/test_coordinator.py @@ -3,6 +3,7 @@ from datetime import datetime from io import BytesIO from unittest.mock import AsyncMock +import pytest import pytz from freezegun import freeze_time from homeassistant.components.weather import (ATTR_CONDITION_CLOUDY, @@ -109,6 +110,7 @@ def test_hourly_forecast() -> None: @freeze_time(datetime.fromisoformat("2023-12-28T15:30:00+01:00")) +@pytest.mark.skip(reason="Outdated test, cannot be tested this way with the new camera features") async def test_get_image_nl( hass: HomeAssistant, mock_image_irm_kmi_api: AsyncMock, @@ -143,6 +145,7 @@ async def test_get_image_nl( @freeze_time(datetime.fromisoformat("2023-12-26T18:31:00+01:00")) +@pytest.mark.skip(reason="Outdated test, cannot be tested this way with the new camera features") async def test_get_image_be( hass: HomeAssistant, mock_image_irm_kmi_api: AsyncMock,