"""Create graphs for rain short term forecast.""" import base64 import copy import logging import os from typing import List import pytz from svgwrite import Drawing from svgwrite.animate import Animate from custom_components.irm_kmi.data import (AnimationFrameData, RadarAnimationData) _LOGGER = logging.getLogger(__name__) class RainGraph: def __init__(self, animation_data: RadarAnimationData, background_image_path: str, background_size: (int, int), config_dir: str = '.', dark_mode: bool = False, tz: str = 'UTC', svg_width: float = 640, inset: float = 20, graph_height: float = 150, top_text_space: float = 30, top_text_y_pos: float = 20, bottom_text_space: float = 50, bottom_text_y_pos: float = 218, auto=True ): self._animation_data: RadarAnimationData = animation_data self._background_image_path: str = background_image_path self._background_size: (int, int) = background_size self._config_dir: str = config_dir self._dark_mode: bool = dark_mode self._tz = pytz.timezone(tz) self._svg_width: float = svg_width self._inset: float = inset self._graph_height: float = graph_height self._top_text_space: float = top_text_space + background_size[1] self._top_text_y_pos: float = top_text_y_pos + background_size[1] self._bottom_text_space: float = bottom_text_space self._bottom_text_y_pos: float = bottom_text_y_pos + background_size[1] self._frame_count: int = len(self._animation_data['sequence']) self._graph_width: float = self._svg_width - 2 * self._inset self._graph_bottom: float = self._top_text_space + self._graph_height self._svg_height: float = self._graph_height + self._top_text_space + self._bottom_text_space self._interval_width: float = self._graph_width / self._frame_count self._offset: float = self._inset + self._interval_width / 2 if not (0 <= self._top_text_y_pos <= self._top_text_space): raise ValueError("It must hold that 0 <= top_text_y_pos <= top_text_space") if not (self._graph_bottom <= self._bottom_text_y_pos <= self._graph_bottom + self._bottom_text_space): raise ValueError("bottom_text_y_pos must be below the graph") self._dwg: Drawing = Drawing(size=(self._svg_width, self._svg_height), profile='full') self._dwg_save: Drawing self._dwg_animated: Drawing self._dwg_still: Drawing if auto: self.draw_svg_frame() self.draw_hour_bars() self.draw_chances_path() self.draw_data_line() self.write_hint() self.insert_background() self._dwg_save = copy.deepcopy(self._dwg) self.draw_current_fame_line() self.draw_description_text() self.insert_cloud_layer() self.draw_location() self._dwg_animated = self._dwg self._dwg = self._dwg_save idx = self._animation_data['most_recent_image_idx'] self.draw_current_fame_line(idx) self.draw_description_text(idx) self.insert_cloud_layer(idx) self.draw_location() self._dwg_still = self._dwg def draw_svg_frame(self): """Create the global area to draw the other items""" font_file = os.path.join(self._config_dir, 'custom_components/irm_kmi/resources/roboto_medium.ttf') _LOGGER.debug(f"Opening font file at {font_file}") self._dwg.embed_font(name="Roboto Medium", filename=font_file) self._dwg.embed_stylesheet(""" .roboto { font-family: "Roboto Medium"; } """) fill_color = '#393C40' if self._dark_mode else '#385E95' self._dwg.add(self._dwg.rect(insert=(0, 0), size=(self._svg_width, self._svg_height), rx=None, ry=None, fill=fill_color, stroke='none')) def draw_description_text(self, idx: int | None = None): """For every frame write the amount of precipitation and the time at the top of the graph. If idx is set, only do it for the given idx""" times = [e['time'].astimezone(tz=self._tz).strftime('%H:%M') for e in self._animation_data['sequence']] rain_levels = [f"{e['value']}{self._animation_data['unit']}" for e in self._animation_data['sequence']] if idx is not None: time = times[idx] rain_level = rain_levels[idx] paragraph = self._dwg.add(self._dwg.g(class_="roboto", )) self.write_time_and_rain(paragraph, rain_level, time) return for i in range(self._frame_count): time = times[i] rain_level = rain_levels[i] paragraph = self._dwg.add(self._dwg.g(class_="roboto", )) values = ['hidden'] * self._frame_count values[i] = 'visible' paragraph.add(Animate( attributeName="visibility", values=";".join(values), dur=f"{self._frame_count * 0.3}s", begin="0s", repeatCount="indefinite" )) self.write_time_and_rain(paragraph, rain_level, time) def write_time_and_rain(self, paragraph, rain_level, time): """Using the paragraph object, write the time and rain level data""" paragraph.add(self._dwg.text(f"{time}", insert=(self._offset, self._top_text_y_pos), text_anchor="start", font_size="16px", fill="white", stroke='none')) paragraph.add(self._dwg.text(f"{rain_level}", insert=(self._svg_width / 2, self._top_text_y_pos), text_anchor="middle", font_size="16px", fill="white", stroke='none')) def write_hint(self): """Add the hint text at the bottom of the graph""" paragraph = self._dwg.add(self._dwg.g(class_="roboto", )) hint = self._animation_data['hint'] paragraph.add(self._dwg.text(f"{hint}", insert=(self._svg_width / 2, self._bottom_text_y_pos), text_anchor="middle", font_size="16px", fill="white", stroke='none')) def draw_chances_path(self): """Draw the prevision margin area around the main forecast line""" list_lower_points = [] list_higher_points = [] rain_list: List[AnimationFrameData] = self._animation_data['sequence'] graph_rect_left = self._offset graph_rect_top = self._top_text_space for i in range(len(rain_list)): position_higher = rain_list[i]['position_higher'] if position_higher is not None: list_higher_points.append((graph_rect_left, graph_rect_top + ( 1.0 - position_higher) * self._graph_height)) graph_rect_left += self._interval_width graph_rect_right = graph_rect_left - self._interval_width for i in range(len(rain_list) - 1, -1, -1): position_lower = rain_list[i]['position_lower'] if position_lower is not None: list_lower_points.append((graph_rect_right, graph_rect_top + ( 1.0 - position_lower) * self._graph_height)) graph_rect_right -= self._interval_width if list_higher_points and list_lower_points: self.draw_chance_precip(list_higher_points, list_lower_points) def draw_chance_precip(self, list_higher_points: List, list_lower_points: List): """Draw the blue solid line representing the actual rain forecast""" precip_higher_chance_path = self._dwg.path(fill='#63c8fa', stroke='none', opacity=.3) list_higher_points[-1] = tuple(list(list_higher_points[-1]) + ['last']) self.set_curved_path(precip_higher_chance_path, list_higher_points + list_lower_points) self._dwg.add(precip_higher_chance_path) @staticmethod def set_curved_path(path, points): """Pushes points on the path by creating a nice curve between them""" if len(points) < 2: return path.push('M', *points[0]) for i in range(1, len(points)): x_mid = (points[i - 1][0] + points[i][0]) / 2 y_mid = (points[i - 1][1] + points[i][1]) / 2 path.push('Q', points[i - 1][0], points[i - 1][1], x_mid, y_mid) if points[i][-1] == 'last' or points[i - 1][-1] == 'last': path.push('Q', points[i][0], points[i][1], points[i][0], points[i][1]) path.push('Q', points[-1][0], points[-1][1], points[-1][0], points[-1][1]) def draw_data_line(self): """Draw the main data line for the rain forecast""" rain_list: List[AnimationFrameData] = self._animation_data['sequence'] graph_rect_left = self._offset graph_rect_top = self._top_text_space entry_list = [] for i in range(len(rain_list)): position = rain_list[i]['position'] entry_list.append( (graph_rect_left, graph_rect_top + (1.0 - position) * self._graph_height)) graph_rect_left += self._interval_width data_line_path = self._dwg.path(fill='none', stroke='#63c8fa', stroke_width=2) self.set_curved_path(data_line_path, entry_list) self._dwg.add(data_line_path) def draw_hour_bars(self): """Draw the small bars at the bottom to represent the time""" hour_bar_height = 8 horizontal_inset = self._offset for (i, rain_item) in enumerate(self._animation_data['sequence']): time_image = rain_item['time'].astimezone(tz=self._tz) is_hour_bar = time_image.minute == 0 x_position = horizontal_inset if i == self._animation_data['most_recent_image_idx']: self._dwg.add(self._dwg.line(start=(x_position, self._top_text_space), end=(x_position, self._graph_bottom), stroke='white', opacity=0.5, stroke_dasharray=4)) self._dwg.add(self._dwg.line(start=(x_position, self._graph_bottom - hour_bar_height), end=(x_position, self._graph_bottom), stroke='white' if is_hour_bar else 'lightgrey', opacity=0.9 if is_hour_bar else 0.7)) if is_hour_bar: graph_rect_center_x = x_position graph_rect_center_y = self._graph_bottom + 18 paragraph = self._dwg.add(self._dwg.g(class_="roboto", )) paragraph.add(self._dwg.text(f"{time_image.hour}h", insert=(graph_rect_center_x, graph_rect_center_y), text_anchor="middle", font_size="16px", fill="white", stroke='none')) horizontal_inset += self._interval_width self._dwg.add(self._dwg.line(start=(self._offset, self._graph_bottom), end=(self._graph_width + self._interval_width / 2, self._graph_bottom), stroke='white')) def draw_current_fame_line(self, idx: int | None = None): """Draw a solid white line on the timeline at the position of the given frame index""" x_position = self._offset if idx is None else self._offset + idx * self._interval_width now = self._dwg.add(self._dwg.line(start=(x_position, self._top_text_space), end=(x_position, self._graph_bottom), id='now', stroke='white', opacity=1, stroke_width=2)) if idx is not None: return now.add(self._dwg.animateTransform("translate", "transform", id="now", from_=f"{self._offset} 0", to=f"{self._graph_width - self._offset} 0", dur=f"{self._frame_count * 0.3}s", repeatCount="indefinite")) def get_svg_string(self, still_image: bool = False) -> bytes: return self._dwg_still.tostring().encode() if still_image else self._dwg_animated.tostring().encode() def insert_background(self): bg_image_path = os.path.join(self._config_dir, self._background_image_path) with open(bg_image_path, 'rb') as f: png_data = base64.b64encode(f.read()).decode('utf-8') image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size) self._dwg.add(image) def insert_cloud_layer(self, idx: int | None = None): imgs = [e['image'] for e in self._animation_data['sequence']] if idx is not None: img = imgs[idx] png_data = base64.b64encode(img).decode('utf-8') image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size) self._dwg.add(image) return for i, img in enumerate(imgs): png_data = base64.b64encode(img).decode('utf-8') image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size) self._dwg.add(image) values = ['hidden'] * self._frame_count values[i] = 'visible' image.add(Animate( attributeName="visibility", values=";".join(values), dur=f"{self._frame_count * 0.3}s", begin="0s", repeatCount="indefinite" )) def draw_location(self): img = self._animation_data['location'] png_data = base64.b64encode(img).decode('utf-8') image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size) self._dwg.add(image) def get_dwg(self): return copy.deepcopy(self._dwg)