Refactor SVG camera

This commit is contained in:
Jules 2024-01-02 20:45:14 +01:00
parent 3915d9ff23
commit c9ec30b8b2
Signed by: jdejaegh
GPG key ID: 99D6D184CA66933A
5 changed files with 90 additions and 55 deletions

View file

@ -55,7 +55,7 @@ class IrmKmiRadar(CoordinatorEntity, Camera):
height: int | None = None) -> bytes | None:
"""Return still image to be used as thumbnail."""
# TODO make it a still image to avoid cuts in playback on the dashboard
return self.coordinator.data.get('animation', {}).get('svg').encode()
return self.coordinator.data.get('animation', {}).get('svg_still')
async def async_camera_image(
self,
@ -68,18 +68,18 @@ class IrmKmiRadar(CoordinatorEntity, Camera):
async def handle_async_still_stream(self, request: web.Request, interval: float) -> web.StreamResponse:
"""Generate an HTTP MJPEG stream from camera images."""
self._image_index = 0
return await async_get_still_stream(request, self.iterate, self.content_type, interval)
return await async_get_still_stream(request, self.get_animated_svg, self.content_type, interval)
async def handle_async_mjpeg_stream(self, request: web.Request) -> web.StreamResponse:
"""Serve an HTTP MJPEG stream from the camera."""
return await self.handle_async_still_stream(request, self.frame_interval)
async def iterate(self) -> bytes | None:
"""Loop over all the frames when called multiple times."""
async def get_animated_svg(self) -> bytes | None:
"""Returns the animated svg for camera display"""
# If this is not done this way, the live view can only be opened once
self._image_index = not self._image_index
if self._image_index:
return self.camera_image()
return self.coordinator.data.get('animation', {}).get('svg_animated')
else:
return None

View file

@ -113,8 +113,9 @@ class IrmKmiCoordinator(DataUpdateCoordinator):
unit=api_data.get('animation', {}).get('unit', {}).get(lang),
location=localisation
)
svg_str = self.create_rain_graph(radar_animation, animation_data, country, images_from_api)
radar_animation['svg'] = svg_str
rain_graph = self.create_rain_graph(radar_animation, animation_data, country, images_from_api)
radar_animation['svg_animated'] = rain_graph.get_svg_string()
radar_animation['svg_still'] = rain_graph.get_svg_string(still_image=True)
return radar_animation
async def process_api_data(self, api_data: dict) -> ProcessedCoordinatorData:
@ -146,7 +147,6 @@ class IrmKmiCoordinator(DataUpdateCoordinator):
_LOGGER.debug(f"Just downloaded {len(images_from_api)} images")
return images_from_api
@staticmethod
def current_weather_from_data(api_data: dict) -> CurrentWeatherData:
"""Parse the API data to build a CurrentWeatherData."""
@ -300,7 +300,7 @@ class IrmKmiCoordinator(DataUpdateCoordinator):
api_animation_data: List[dict],
country: str,
images_from_api: Tuple[bytes],
) -> str:
) -> RainGraph:
sequence: List[AnimationFrameData] = list()
tz = pytz.timezone(self.hass.config.time_zone)
@ -334,10 +334,6 @@ class IrmKmiCoordinator(DataUpdateCoordinator):
f"{'satellite' if satellite_mode else 'black' if self._dark_mode else 'white'}.png")
bg_size = (640, 490)
svg_str = RainGraph(radar_animation, image_path, bg_size,
dark_mode=self._dark_mode,
# tz=self.hass.config.time_zone
).get_svg_string()
# TODO return value
return svg_str
return RainGraph(radar_animation, image_path, bg_size,
dark_mode=self._dark_mode,
tz=self.hass.config.time_zone)

View file

@ -33,8 +33,6 @@ class AnimationFrameData(TypedDict, total=False):
position: float | None
position_higher: float | None
position_lower: float | None
rain_graph: bytes | None
merged_image: bytes | None
class RadarAnimationData(TypedDict, total=False):
@ -44,7 +42,8 @@ class RadarAnimationData(TypedDict, total=False):
hint: str | None
unit: str | None
location: bytes | None
svg: str | None
svg_still: bytes | None
svg_animated: bytes | None
class ProcessedCoordinatorData(TypedDict, total=False):

View file

@ -9,7 +9,6 @@
"iot_class": "cloud_polling",
"issue_tracker": "https://github.com/jdejaegh/irm-kmi-ha/issues",
"requirements": [
"Pillow==10.1.0",
"pytz==2023.3.post1",
"svgwrite==1.4.3"
],

View file

@ -1,6 +1,7 @@
"""Create graphs for rain short term forecast."""
import base64
import copy
from typing import List
import pytz
@ -23,8 +24,9 @@ class RainGraph:
graph_height: float = 150,
top_text_space: float = 30,
top_text_y_pos: float = 20,
bottom_text_space: float = 45,
bottom_text_y_pos: float = 215
bottom_text_space: float = 50,
bottom_text_y_pos: float = 218,
auto=True
):
self._animation_data: RadarAnimationData = animation_data
@ -54,16 +56,36 @@ class RainGraph:
raise ValueError("bottom_text_y_pos must be below the graph")
self._dwg: Drawing = Drawing(size=(self._svg_width, self._svg_height), profile='full')
self._dwg_save: Drawing
self._dwg_animated: Drawing
self._dwg_still: Drawing
self.draw_svg_graph()
self.draw_current_fame_line()
if auto:
self.draw_svg_frame()
self.draw_hour_bars()
self.draw_chances_path()
self.draw_data_line()
self.write_hint()
self.insert_background()
self._dwg_save = copy.deepcopy(self._dwg)
self.draw_description_text()
self.insert_background()
self.insert_cloud_layer()
self.draw_location()
self.draw_current_fame_line()
self.draw_description_text()
self.insert_cloud_layer()
self.draw_location()
self._dwg_animated = self._dwg
def draw_svg_graph(self):
self._dwg = self._dwg_save
idx = self._animation_data['most_recent_image_idx']
self.draw_current_fame_line(idx)
self.draw_description_text(idx)
self.insert_cloud_layer(idx)
self.draw_location()
self._dwg_still = self._dwg
self._dwg_animated.saveas("animated_rain.svg")
def draw_svg_frame(self):
"""Create the global area to draw the other items"""
self._dwg.embed_font(name="Roboto Medium", filename='custom_components/irm_kmi/resources/roboto_medium.ttf')
self._dwg.embed_stylesheet("""
@ -78,18 +100,23 @@ class RainGraph:
rx=None, ry=None,
fill=fill_color, stroke='none'))
self.draw_hour_bars()
self.draw_chances_path()
self.draw_data_line()
self.write_hint()
def draw_description_text(self, idx: int | None = None):
"""For every frame write the amount of precipitation and the time at the top of the graph.
If idx is set, only do it for the given idx"""
def draw_description_text(self):
"""For the given frame idx, write the amount of precipitation and the time at the top of the graph"""
times = [e['time'].astimezone(tz=self._tz).isoformat(sep=' ', timespec='minutes') for e in
times = [e['time'].astimezone(tz=self._tz).strftime('%H:%M') for e in
self._animation_data['sequence']]
rain_levels = [f"{e['value']}{self._animation_data['unit']}" for e in self._animation_data['sequence']]
if idx is not None:
time = times[idx]
rain_level = rain_levels[idx]
paragraph = self._dwg.add(self._dwg.g(class_="roboto", ))
self.write_time_and_rain(paragraph, rain_level, time)
return
for i in range(self._frame_count):
time = times[i]
rain_level = rain_levels[i]
@ -107,17 +134,19 @@ class RainGraph:
repeatCount="indefinite"
))
paragraph.add(self._dwg.text(f"{time}", insert=(self._offset, self._top_text_y_pos),
text_anchor="start",
font_size="14px",
fill="white",
stroke='none'))
self.write_time_and_rain(paragraph, rain_level, time)
paragraph.add(self._dwg.text(f"{rain_level}", insert=(self._svg_width / 2, self._top_text_y_pos),
text_anchor="middle",
font_size="14px",
fill="white",
stroke='none'))
def write_time_and_rain(self, paragraph, rain_level, time):
paragraph.add(self._dwg.text(f"{time}", insert=(self._offset, self._top_text_y_pos),
text_anchor="start",
font_size="16px",
fill="white",
stroke='none'))
paragraph.add(self._dwg.text(f"{rain_level}", insert=(self._svg_width / 2, self._top_text_y_pos),
text_anchor="middle",
font_size="16px",
fill="white",
stroke='none'))
def write_hint(self):
"""Add the hint text at the bottom of the graph"""
@ -127,7 +156,7 @@ class RainGraph:
paragraph.add(self._dwg.text(f"{hint}", insert=(self._svg_width / 2, self._bottom_text_y_pos),
text_anchor="middle",
font_size="14px",
font_size="16px",
fill="white",
stroke='none'))
@ -226,12 +255,12 @@ class RainGraph:
if is_hour_bar:
graph_rect_center_x = x_position
graph_rect_center_y = self._graph_bottom + 15
graph_rect_center_y = self._graph_bottom + 18
paragraph = self._dwg.add(self._dwg.g(class_="roboto", ))
paragraph.add(self._dwg.text(f"{time_image.hour}h", insert=(graph_rect_center_x, graph_rect_center_y),
text_anchor="middle",
font_size="14px",
font_size="16px",
fill="white",
stroke='none'))
@ -241,15 +270,17 @@ class RainGraph:
end=(self._graph_width + self._interval_width / 2, self._graph_bottom),
stroke='white'))
def draw_current_fame_line(self):
def draw_current_fame_line(self, idx: int | None = None):
"""Draw a solid white line on the timeline at the position of the given frame index"""
x_position = self._offset
x_position = self._offset if idx is None else self._offset + idx * self._interval_width
now = self._dwg.add(self._dwg.line(start=(x_position, self._top_text_space),
end=(x_position, self._graph_bottom),
id='now',
stroke='white',
opacity=1,
stroke_width=2))
if idx is not None:
return
now.add(self._dwg.animateTransform("translate", "transform",
id="now",
from_=f"{self._offset} 0",
@ -257,8 +288,8 @@ class RainGraph:
dur=f"{self._frame_count * 0.3}s",
repeatCount="indefinite"))
def get_svg_string(self):
return self._dwg.tostring()
def get_svg_string(self, still_image: bool = False) -> bytes:
return self._dwg_still.tostring().encode() if still_image else self._dwg_animated.tostring().encode()
def insert_background(self):
with open(self._background_image_path, 'rb') as f:
@ -266,9 +297,16 @@ class RainGraph:
image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size)
self._dwg.add(image)
def insert_cloud_layer(self):
def insert_cloud_layer(self, idx: int | None = None):
imgs = [e['image'] for e in self._animation_data['sequence']]
if idx is not None:
img = imgs[idx]
png_data = base64.b64encode(img).decode('utf-8')
image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size)
self._dwg.add(image)
return
for i, img in enumerate(imgs):
png_data = base64.b64encode(img).decode('utf-8')
image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size)
@ -290,3 +328,6 @@ class RainGraph:
png_data = base64.b64encode(img).decode('utf-8')
image = self._dwg.image("data:image/png;base64," + png_data, insert=(0, 0), size=self._background_size)
self._dwg.add(image)
def get_dwg(self):
return copy.deepcopy(self._dwg)