Remove YAML config and setup UI config

This commit is contained in:
Jules 2023-12-26 16:01:32 +01:00
parent 5a7c0ae8f0
commit e5c5a9223c
Signed by: jdejaegh
GPG key ID: 99D6D184CA66933A
8 changed files with 267 additions and 149 deletions

View file

@ -1,7 +1,39 @@
"""Integration for IRM KMI weather"""
from homeassistant.components.weather import Forecast
class IrmKmiForecast(Forecast):
"""Forecast class with additional attributes for IRM KMI"""
text_fr: str | None
text_nl: str | None
# File inspired from https://github.com/ludeeus/integration_blueprint
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.const import CONF_ZONE
from .const import DOMAIN, PLATFORMS
from .coordinator import IrmKmiCoordinator
from .weather import IrmKmiWeather
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up this integration using UI."""
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][entry.entry_id] = coordinator = IrmKmiCoordinator(hass, entry.data[CONF_ZONE])
# https://developers.home-assistant.io/docs/integration_fetching_data#coordinated-single-api-poll-for-data-for-all-entities
await coordinator.async_config_entry_first_refresh()
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(entry.add_update_listener(async_reload_entry))
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Handle removal of an entry."""
if unloaded := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
hass.data[DOMAIN].pop(entry.entry_id)
return unloaded
async def async_reload_entry(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Reload config entry."""
await async_unload_entry(hass, entry)
await async_setup_entry(hass, entry)

View file

@ -1,6 +1,7 @@
"""API Client for IRM KMI weather"""
from __future__ import annotations
import logging
import asyncio
import socket
@ -9,6 +10,8 @@ import async_timeout
import hashlib
from datetime import datetime
_LOGGER = logging.getLogger(__name__)
class IrmKmiApiError(Exception):
"""Exception to indicate a general API error."""
@ -26,12 +29,6 @@ class IrmKmiApiParametersError(
"""Exception to indicate a parameter error."""
class IrmKmiApiAuthenticationError(
IrmKmiApiError
):
"""Exception to indicate an authentication error."""
def _api_key(method_name: str):
"""Get API key."""
return hashlib.md5(f"r9EnW374jkJ9acc;{method_name};{datetime.now().strftime('%d/%m/%Y')}".encode()).hexdigest()
@ -39,7 +36,7 @@ def _api_key(method_name: str):
class IrmKmiApiClient:
"""Sample API Client."""
COORD_DECIMALS = 6
def __init__(self, session: aiohttp.ClientSession) -> None:
"""Sample API Client."""
self._session = session
@ -54,6 +51,11 @@ class IrmKmiApiClient:
async def get_forecasts_coord(self, coord: dict) -> any:
"""Get forecasts for given city."""
assert 'lat' in coord
assert 'long' in coord
coord['lat'] = round(coord['lat'], self.COORD_DECIMALS)
coord['long'] = round(coord['long'], self.COORD_DECIMALS)
return await self._api_wrapper(
params={"s": "getForecasts"} | coord
)
@ -75,6 +77,7 @@ class IrmKmiApiClient:
try:
async with async_timeout.timeout(10):
_LOGGER.debug(f"Calling for {params}")
response = await self._session.request(
method=method,
url=f"{self._base_url}{path}",
@ -82,6 +85,7 @@ class IrmKmiApiClient:
json=data,
params=params
)
_LOGGER.debug(f"API status code {response.status}")
response.raise_for_status()
return await response.json()

View file

@ -0,0 +1,41 @@
import logging
import voluptuous as vol
from homeassistant.components.zone import DOMAIN as ZONE_DOMAIN
from homeassistant.config_entries import ConfigFlow
from homeassistant.const import CONF_ZONE
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers.selector import EntitySelector, EntitySelectorConfig
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
class IrmKmiConfigFlow(ConfigFlow, domain=DOMAIN):
VERSION = 1
async def async_step_user(self, user_input: dict | None = None) -> FlowResult:
if user_input is not None:
_LOGGER.debug(f"Provided config user is: {user_input}")
await self.async_set_unique_id(user_input[CONF_ZONE])
self._abort_if_unique_id_configured()
state = self.hass.states.get(user_input[CONF_ZONE])
return self.async_create_entry(
title=state.name if state else "IRM KMI",
data={CONF_ZONE: user_input[CONF_ZONE]},
)
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
{
vol.Required(CONF_ZONE): EntitySelector(
EntitySelectorConfig(domain=ZONE_DOMAIN),
),
}
),
)

View file

@ -11,8 +11,14 @@ from homeassistant.components.weather import (
ATTR_CONDITION_CLEAR_NIGHT,
ATTR_CONDITION_EXCEPTIONAL
)
from homeassistant.const import Platform
DOMAIN = 'irm_kmi'
PLATFORMS: list[Platform] = [Platform.WEATHER]
OUT_OF_BENELUX = ["außerhalb der Benelux (Brussels)",
"Hors de Belgique (Bxl)",
"Outside the Benelux (Brussels)",
"Buiten de Benelux (Brussel)"]
# map ('ww', 'dayNight') tuple from IRM KMI to HA conditions
IRM_KMI_TO_HA_CONDITION_MAP = {

View file

@ -7,19 +7,100 @@ from typing import List
import async_timeout
from homeassistant.components.weather import Forecast
from homeassistant.const import ATTR_LATITUDE, ATTR_LONGITUDE
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import (
DataUpdateCoordinator,
UpdateFailed,
)
from . import IrmKmiForecast
from .const import IRM_KMI_TO_HA_CONDITION_MAP as CDT_MAP
from .data import IrmKmiForecast
from .const import OUT_OF_BENELUX, IRM_KMI_TO_HA_CONDITION_MAP as CDT_MAP
from .api import IrmKmiApiClient, IrmKmiApiError
_LOGGER = logging.getLogger(__name__)
class IrmKmiCoordinator(DataUpdateCoordinator):
"""Coordinator to update data from IRM KMI"""
def __init__(self, hass, zone):
"""Initialize my coordinator."""
super().__init__(
hass,
_LOGGER,
# Name of the data. For logging purposes.
name="IRM KMI weather",
# Polling interval. Will only be polled if there are subscribers.
update_interval=timedelta(seconds=30),
)
self._api_client = IrmKmiApiClient(session=async_get_clientsession(hass))
self._zone = zone
async def _async_update_data(self):
"""Fetch data from API endpoint.
This is the place to pre-process the data to lookup tables
so entities can quickly look up their data.
"""
if (zone := self.hass.states.get(self._zone)) is None:
raise UpdateFailed(f"Zone '{self._zone}' not found")
try:
# Note: asyncio.TimeoutError and aiohttp.ClientError are already
# handled by the data update coordinator.
async with async_timeout.timeout(10):
api_data = await self._api_client.get_forecasts_coord(
{'lat': zone.attributes[ATTR_LATITUDE],
'long': zone.attributes[ATTR_LONGITUDE]}
)
_LOGGER.debug(f"Observation for {api_data.get('cityName', '')}: {api_data.get('obs', '{}')}")
except IrmKmiApiError as err:
raise UpdateFailed(f"Error communicating with API: {err}")
if api_data.get('cityName', None) in OUT_OF_BENELUX:
raise UpdateFailed(f"Zone '{self._zone}' is out of Benelux and forecast is only available in the Benelux")
return self.process_api_data(api_data)
@staticmethod
def process_api_data(api_data):
# Process data to get current hour forecast
now_hourly = None
hourly_forecast_data = api_data.get('for', {}).get('hourly')
if not (hourly_forecast_data is None
or not isinstance(hourly_forecast_data, list)
or len(hourly_forecast_data) == 0):
for current in hourly_forecast_data[:2]:
if datetime.now().strftime('%H') == current['hour']:
now_hourly = current
# Get UV index
module_data = api_data.get('module', None)
uv_index = None
if not (module_data is None or not isinstance(module_data, list)):
for module in module_data:
if module.get('type', None) == 'uv':
uv_index = module.get('data', {}).get('levelValue')
# Put everything together
processed_data = {
'current_weather': {
'condition': CDT_MAP.get(
(api_data.get('obs', {}).get('ww'), api_data.get('obs', {}).get('dayNight')), None),
'temperature': api_data.get('obs', {}).get('temp'),
'wind_speed': now_hourly.get('windSpeedKm', None) if now_hourly is not None else None,
'wind_gust_speed': now_hourly.get('windPeakSpeedKm', None) if now_hourly is not None else None,
'wind_bearing': now_hourly.get('windDirectionText', {}).get('en') if now_hourly is not None else None,
'pressure': now_hourly.get('pressure', None) if now_hourly is not None else None,
'uv_index': uv_index
},
'daily_forecast': IrmKmiCoordinator.daily_list_to_forecast(api_data.get('for', {}).get('daily')),
'hourly_forecast': IrmKmiCoordinator.hourly_list_to_forecast(api_data.get('for', {}).get('hourly'))
}
return processed_data
@staticmethod
def hourly_list_to_forecast(data: List[dict] | None) -> List[Forecast] | None:
if data is None or not isinstance(data, list) or len(data) == 0:
return None
@ -61,7 +142,7 @@ def hourly_list_to_forecast(data: List[dict] | None) -> List[Forecast] | None:
return forecasts
@staticmethod
def daily_list_to_forecast(data: List[dict] | None) -> List[Forecast] | None:
if data is None or not isinstance(data, list) or len(data) == 0:
return None
@ -96,71 +177,3 @@ def daily_list_to_forecast(data: List[dict] | None) -> List[Forecast] | None:
n_days += 1
return forecasts
class IrmKmiCoordinator(DataUpdateCoordinator):
"""Coordinator to update data from IRM KMI"""
def __init__(self, hass, coord: dict):
"""Initialize my coordinator."""
super().__init__(
hass,
_LOGGER,
# Name of the data. For logging purposes.
name="IRM KMI weather",
# Polling interval. Will only be polled if there are subscribers.
update_interval=timedelta(seconds=30),
)
self._api_client = IrmKmiApiClient(session=async_get_clientsession(hass))
self._coord = coord
async def _async_update_data(self):
"""Fetch data from API endpoint.
This is the place to pre-process the data to lookup tables
so entities can quickly look up their data.
"""
try:
# Note: asyncio.TimeoutError and aiohttp.ClientError are already
# handled by the data update coordinator.
async with async_timeout.timeout(10):
api_data = await self._api_client.get_forecasts_coord(self._coord)
_LOGGER.debug(f"Observation for {api_data.get('cityName', '')}: {api_data.get('obs', '{}')}")
# Process data to get current hour forecast
now_hourly = None
hourly_forecast_data = api_data.get('for', {}).get('hourly')
if not (hourly_forecast_data is None
or not isinstance(hourly_forecast_data, list)
or len(hourly_forecast_data) == 0):
for current in hourly_forecast_data[:2]:
if datetime.now().strftime('%H') == current['hour']:
now_hourly = current
# Get UV index
module_data = api_data.get('module', None)
uv_index = None
if not (module_data is None or not isinstance(module_data, list)):
for module in module_data:
if module.get('type', None) == 'uv':
uv_index = module.get('data', {}).get('levelValue')
# Put everything together
processed_data = {
'current_weather': {
'condition': CDT_MAP.get(
(api_data.get('obs', {}).get('ww'), api_data.get('obs', {}).get('dayNight')), None),
'temperature': api_data.get('obs', {}).get('temp'),
'wind_speed': now_hourly.get('windSpeedKm', None) if now_hourly is not None else None,
'wind_gust_speed': now_hourly.get('windPeakSpeedKm', None) if now_hourly is not None else None,
'wind_bearing': now_hourly.get('windDirectionText', {}).get('en') if now_hourly is not None else None,
'pressure': now_hourly.get('pressure', None) if now_hourly is not None else None,
'uv_index': uv_index
},
'daily_forecast': daily_list_to_forecast(api_data.get('for', {}).get('daily')),
'hourly_forecast': hourly_list_to_forecast(api_data.get('for', {}).get('hourly'))
}
return processed_data
except IrmKmiApiError as err:
raise UpdateFailed(f"Error communicating with API: {err}")

View file

@ -0,0 +1,9 @@
from homeassistant.components.weather import Forecast
class IrmKmiForecast(Forecast):
"""Forecast class with additional attributes for IRM KMI"""
# TODO: add condition_2 as well and evolution to match data from the API?
text_fr: str | None
text_nl: str | None

View file

@ -2,6 +2,7 @@
"domain": "irm_kmi",
"name": "IRM KMI Weather Belgium",
"codeowners": ["@jdejaegh"],
"config_flow": true,
"dependencies": [],
"documentation": "https://github.com/jdejaegh/irm-kmi-ha/",
"integration_type": "service",

View file

@ -2,37 +2,52 @@ import logging
from typing import List
from homeassistant.components.weather import WeatherEntity, WeatherEntityFeature, Forecast
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import UnitOfTemperature, UnitOfSpeed, UnitOfPrecipitationDepth, UnitOfPressure
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
)
from . import DOMAIN
from .coordinator import IrmKmiCoordinator
_LOGGER = logging.getLogger(__name__)
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
_LOGGER.debug(f"IRM KMI setup. Config: {config}")
coordinator = IrmKmiCoordinator(hass, coord={'lat': config.get("lat"), 'long': config.get("lon")})
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback):
"""Set up the weather entry."""
_LOGGER.debug(f'async_setup_entry entry is: {entry}')
coordinator = hass.data[DOMAIN][entry.entry_id]
await coordinator.async_config_entry_first_refresh()
async_add_entities([IrmKmiWeather(
coordinator,
config.get("name", "IRM KMI Weather")
)])
async_add_entities(
[IrmKmiWeather(coordinator, entry)]
)
class IrmKmiWeather(CoordinatorEntity, WeatherEntity):
def __init__(self, coordinator: IrmKmiCoordinator, name: str) -> None:
def __init__(self,
coordinator: IrmKmiCoordinator,
entry: ConfigEntry
) -> None:
super().__init__(coordinator)
self._name = name
self._name = entry.title
self._attr_unique_id = entry.entry_id
self._attr_device_info = DeviceInfo(
entry_type=DeviceEntryType.SERVICE,
identifiers={(DOMAIN, entry.entry_id)},
manufacturer="IRM KMI",
name=entry.title
)
@property
def supported_features(self) -> WeatherEntityFeature:
features = WeatherEntityFeature(0)
features |= WeatherEntityFeature.FORECAST_DAILY
features |= WeatherEntityFeature.FORECAST_TWICE_DAILY
features |= WeatherEntityFeature.FORECAST_HOURLY
return features
@ -85,17 +100,14 @@ class IrmKmiWeather(CoordinatorEntity, WeatherEntity):
def uv_index(self) -> float | None:
return self.coordinator.data.get('current_weather').get('uv_index')
@property
def forecast(self) -> list[Forecast] | None:
result = list()
if self.coordinator.data.get('daily_forecast') is not None:
result += self.coordinator.data.get('daily_forecast')
if self.coordinator.data.get('hourly_forecast') is not None:
result += self.coordinator.data.get('hourly_forecast')
return result
async def async_forecast_twice_daily(self) -> List[Forecast] | None:
return self.coordinator.data.get('daily_forecast')
async def async_forecast_daily(self) -> list[Forecast] | None:
data: list[Forecast] = self.coordinator.data.get('daily_forecast')
if not isinstance(data, list):
return None
return [f for f in data if f.get('is_daytime')]
async def async_forecast_hourly(self) -> list[Forecast] | None:
return self.coordinator.data.get('hourly_forecast')