diff --git a/custom_components/irm_kmi/camera.py b/custom_components/irm_kmi/camera.py index 14c7ddd..de8bc7f 100644 --- a/custom_components/irm_kmi/camera.py +++ b/custom_components/irm_kmi/camera.py @@ -55,7 +55,7 @@ class IrmKmiRadar(CoordinatorEntity, Camera): height: int | None = None) -> bytes | None: """Return still image to be used as thumbnail.""" # TODO make it a still image to avoid cuts in playback on the dashboard - return self.coordinator.data.get('animation', {}).get('svg').encode() + return self.coordinator.data.get('animation', {}).get('svg_still') async def async_camera_image( self, @@ -68,18 +68,18 @@ class IrmKmiRadar(CoordinatorEntity, Camera): async def handle_async_still_stream(self, request: web.Request, interval: float) -> web.StreamResponse: """Generate an HTTP MJPEG stream from camera images.""" self._image_index = 0 - return await async_get_still_stream(request, self.iterate, self.content_type, interval) + return await async_get_still_stream(request, self.get_animated_svg, self.content_type, interval) async def handle_async_mjpeg_stream(self, request: web.Request) -> web.StreamResponse: """Serve an HTTP MJPEG stream from the camera.""" return await self.handle_async_still_stream(request, self.frame_interval) - async def iterate(self) -> bytes | None: - """Loop over all the frames when called multiple times.""" + async def get_animated_svg(self) -> bytes | None: + """Returns the animated svg for camera display""" # If this is not done this way, the live view can only be opened once self._image_index = not self._image_index if self._image_index: - return self.camera_image() + return self.coordinator.data.get('animation', {}).get('svg_animated') else: return None diff --git a/custom_components/irm_kmi/coordinator.py b/custom_components/irm_kmi/coordinator.py index 5075ee0..b81eb45 100644 --- a/custom_components/irm_kmi/coordinator.py +++ b/custom_components/irm_kmi/coordinator.py @@ -113,8 +113,9 @@ class IrmKmiCoordinator(DataUpdateCoordinator): unit=api_data.get('animation', {}).get('unit', {}).get(lang), location=localisation ) - svg_str = self.create_rain_graph(radar_animation, animation_data, country, images_from_api) - radar_animation['svg'] = svg_str + rain_graph = self.create_rain_graph(radar_animation, animation_data, country, images_from_api) + radar_animation['svg_animated'] = rain_graph.get_svg_string() + radar_animation['svg_still'] = rain_graph.get_svg_string(still_image=True) return radar_animation async def process_api_data(self, api_data: dict) -> ProcessedCoordinatorData: @@ -146,7 +147,6 @@ class IrmKmiCoordinator(DataUpdateCoordinator): _LOGGER.debug(f"Just downloaded {len(images_from_api)} images") return images_from_api - @staticmethod def current_weather_from_data(api_data: dict) -> CurrentWeatherData: """Parse the API data to build a CurrentWeatherData.""" @@ -300,7 +300,7 @@ class IrmKmiCoordinator(DataUpdateCoordinator): api_animation_data: List[dict], country: str, images_from_api: Tuple[bytes], - ) -> str: + ) -> RainGraph: sequence: List[AnimationFrameData] = list() tz = pytz.timezone(self.hass.config.time_zone) @@ -334,10 +334,6 @@ class IrmKmiCoordinator(DataUpdateCoordinator): f"{'satellite' if satellite_mode else 'black' if self._dark_mode else 'white'}.png") bg_size = (640, 490) - svg_str = RainGraph(radar_animation, image_path, bg_size, - dark_mode=self._dark_mode, - # tz=self.hass.config.time_zone - ).get_svg_string() - - # TODO return value - return svg_str + return RainGraph(radar_animation, image_path, bg_size, + dark_mode=self._dark_mode, + tz=self.hass.config.time_zone) diff --git a/custom_components/irm_kmi/data.py b/custom_components/irm_kmi/data.py index d827b5c..0ef88ca 100644 --- a/custom_components/irm_kmi/data.py +++ b/custom_components/irm_kmi/data.py @@ -33,8 +33,6 @@ class AnimationFrameData(TypedDict, total=False): position: float | None position_higher: float | None position_lower: float | None - rain_graph: bytes | None - merged_image: bytes | None class RadarAnimationData(TypedDict, total=False): @@ -44,7 +42,8 @@ class RadarAnimationData(TypedDict, total=False): hint: str | None unit: str | None location: bytes | None - svg: str | None + svg_still: bytes | None + svg_animated: bytes | None class ProcessedCoordinatorData(TypedDict, total=False): diff --git a/custom_components/irm_kmi/manifest.json b/custom_components/irm_kmi/manifest.json index dfa3668..a212d3f 100644 --- a/custom_components/irm_kmi/manifest.json +++ b/custom_components/irm_kmi/manifest.json @@ -9,7 +9,6 @@ "iot_class": "cloud_polling", "issue_tracker": "https://github.com/jdejaegh/irm-kmi-ha/issues", "requirements": [ - "Pillow==10.1.0", "pytz==2023.3.post1", "svgwrite==1.4.3" ], diff --git a/custom_components/irm_kmi/rain_graph.py b/custom_components/irm_kmi/rain_graph.py index 3f0ba07..5d187da 100644 --- a/custom_components/irm_kmi/rain_graph.py +++ b/custom_components/irm_kmi/rain_graph.py @@ -1,6 +1,7 @@ """Create graphs for rain short term forecast.""" import base64 +import copy from typing import List import pytz @@ -23,8 +24,9 @@ class RainGraph: graph_height: float = 150, top_text_space: float = 30, top_text_y_pos: float = 20, - bottom_text_space: float = 45, - bottom_text_y_pos: float = 215 + bottom_text_space: float = 50, + bottom_text_y_pos: float = 218, + auto=True ): self._animation_data: RadarAnimationData = animation_data @@ -54,16 +56,36 @@ class RainGraph: raise ValueError("bottom_text_y_pos must be below the graph") self._dwg: Drawing = Drawing(size=(self._svg_width, self._svg_height), profile='full') + self._dwg_save: Drawing + self._dwg_animated: Drawing + self._dwg_still: Drawing - self.draw_svg_graph() - self.draw_current_fame_line() + if auto: + self.draw_svg_frame() + self.draw_hour_bars() + self.draw_chances_path() + self.draw_data_line() + self.write_hint() + self.insert_background() + self._dwg_save = copy.deepcopy(self._dwg) - self.draw_description_text() - self.insert_background() - self.insert_cloud_layer() - self.draw_location() + self.draw_current_fame_line() + self.draw_description_text() + self.insert_cloud_layer() + self.draw_location() + self._dwg_animated = self._dwg - def draw_svg_graph(self): + self._dwg = self._dwg_save + idx = self._animation_data['most_recent_image_idx'] + self.draw_current_fame_line(idx) + self.draw_description_text(idx) + self.insert_cloud_layer(idx) + self.draw_location() + self._dwg_still = self._dwg + + self._dwg_animated.saveas("animated_rain.svg") + + def draw_svg_frame(self): """Create the global area to draw the other items""" self._dwg.embed_font(name="Roboto Medium", filename='custom_components/irm_kmi/resources/roboto_medium.ttf') self._dwg.embed_stylesheet(""" @@ -78,18 +100,23 @@ class RainGraph: rx=None, ry=None, fill=fill_color, stroke='none')) - self.draw_hour_bars() - self.draw_chances_path() - self.draw_data_line() - self.write_hint() + def draw_description_text(self, idx: int | None = None): + """For every frame write the amount of precipitation and the time at the top of the graph. + If idx is set, only do it for the given idx""" - def draw_description_text(self): - """For the given frame idx, write the amount of precipitation and the time at the top of the graph""" - - times = [e['time'].astimezone(tz=self._tz).isoformat(sep=' ', timespec='minutes') for e in + times = [e['time'].astimezone(tz=self._tz).strftime('%H:%M') for e in self._animation_data['sequence']] rain_levels = [f"{e['value']}{self._animation_data['unit']}" for e in self._animation_data['sequence']] + if idx is not None: + time = times[idx] + rain_level = rain_levels[idx] + + paragraph = self._dwg.add(self._dwg.g(class_="roboto", )) + + self.write_time_and_rain(paragraph, rain_level, time) + return + for i in range(self._frame_count): time = times[i] rain_level = rain_levels[i] @@ -107,17 +134,19 @@ class RainGraph: repeatCount="indefinite" )) - paragraph.add(self._dwg.text(f"{time}", insert=(self._offset, self._top_text_y_pos), - text_anchor="start", - font_size="14px", - fill="white", - stroke='none')) + self.write_time_and_rain(paragraph, rain_level, time) - paragraph.add(self._dwg.text(f"{rain_level}", insert=(self._svg_width / 2, self._top_text_y_pos), - text_anchor="middle", - font_size="14px", - fill="white", - stroke='none')) + def write_time_and_rain(self, paragraph, rain_level, time): + paragraph.add(self._dwg.text(f"{time}", insert=(self._offset, self._top_text_y_pos), + text_anchor="start", + font_size="16px", + fill="white", + stroke='none')) + paragraph.add(self._dwg.text(f"{rain_level}", insert=(self._svg_width / 2, self._top_text_y_pos), + text_anchor="middle", + font_size="16px", + fill="white", + stroke='none')) def write_hint(self): """Add the hint text at the bottom of the graph""" @@ -127,7 +156,7 @@ class RainGraph: paragraph.add(self._dwg.text(f"{hint}", insert=(self._svg_width / 2, self._bottom_text_y_pos), text_anchor="middle", - font_size="14px", + font_size="16px", fill="white", stroke='none')) @@ -226,12 +255,12 @@ class RainGraph: if is_hour_bar: graph_rect_center_x = x_position - graph_rect_center_y = self._graph_bottom + 15 + graph_rect_center_y = self._graph_bottom + 18 paragraph = self._dwg.add(self._dwg.g(class_="roboto", )) paragraph.add(self._dwg.text(f"{time_image.hour}h", insert=(graph_rect_center_x, graph_rect_center_y), text_anchor="middle", - font_size="14px", + font_size="16px", fill="white", stroke='none')) @@ -241,15 +270,17 @@ class RainGraph: end=(self._graph_width + self._interval_width / 2, self._graph_bottom), stroke='white')) - def draw_current_fame_line(self): + def draw_current_fame_line(self, idx: int | None = None): """Draw a solid white line on the timeline at the position of the given frame index""" - x_position = self._offset + x_position = self._offset if idx is None else self._offset + idx * self._interval_width now = self._dwg.add(self._dwg.line(start=(x_position, self._top_text_space), end=(x_position, self._graph_bottom), id='now', stroke='white', opacity=1, stroke_width=2)) + if idx is not None: + return now.add(self._dwg.animateTransform("translate", "transform", id="now", from_=f"{self._offset} 0", @@ -257,8 +288,8 @@ class RainGraph: dur=f"{self._frame_count * 0.3}s", repeatCount="indefinite")) - def get_svg_string(self): - return self._dwg.tostring() + def get_svg_string(self, still_image: bool = False) -> bytes: + return self._dwg_still.tostring().encode() if still_image else self._dwg_animated.tostring().encode() def insert_background(self): with open(self._background_image_path, 'rb') as f: @@ -266,9 +297,16 @@ class RainGraph: image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size) self._dwg.add(image) - def insert_cloud_layer(self): + def insert_cloud_layer(self, idx: int | None = None): imgs = [e['image'] for e in self._animation_data['sequence']] + if idx is not None: + img = imgs[idx] + png_data = base64.b64encode(img).decode('utf-8') + image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size) + self._dwg.add(image) + return + for i, img in enumerate(imgs): png_data = base64.b64encode(img).decode('utf-8') image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size) @@ -290,3 +328,6 @@ class RainGraph: png_data = base64.b64encode(img).decode('utf-8') image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size) self._dwg.add(image) + + def get_dwg(self): + return copy.deepcopy(self._dwg) \ No newline at end of file