Add BelAQI index functions

This commit is contained in:
Jules 2024-06-20 23:29:49 +02:00
parent eb8fb5d6d4
commit 95fa6dde65
Signed by: jdejaegh
GPG key ID: 99D6D184CA66933A
4 changed files with 69 additions and 10 deletions

View file

@ -204,14 +204,14 @@ class IrcelineForecastClient(IrcelineBaseClient):
"""API client for forecast IRCEL - CELINE open data"""
async def get_data(self,
day: date,
timestamp: date,
features: List[ForecastFeature],
position: Tuple[float, float]
) -> Dict[Tuple[ForecastFeature, date], FeatureValue]:
"""
Get forecasted concentrations for the given features at the given position. The forecasts are downloaded for
the specified day and the 4 next days as well
:param day: date at which the forecast are computed (generally today). If unavailable, the day before will be
:param timestamp: date at which the forecast are computed (generally today). If unavailable, the day before will be
tried as well
:param features: pollutants to get the forecasts for
:param position: (lat, long)
@ -221,13 +221,13 @@ class IrcelineForecastClient(IrcelineBaseClient):
result = dict()
for feature, d in product(features, range(5)):
url = f"{forecast_base_url}/BE_{feature}_{day.strftime('%Y%m%d')}_d{d}.csv"
url = f"{forecast_base_url}/BE_{feature}_{timestamp.strftime('%Y%m%d')}_d{d}.csv"
try:
r: ClientResponse = await self._api_cached_wrapper(url)
ts = day
ts = timestamp
except IrcelineApiError:
# retry for the day before
yesterday = day - timedelta(days=1)
yesterday = timestamp - timedelta(days=1)
print('here')
url = f"{forecast_base_url}/BE_{feature}_{yesterday.strftime('%Y%m%d')}_d{d}.csv"
try:
@ -235,7 +235,7 @@ class IrcelineForecastClient(IrcelineBaseClient):
ts = yesterday
except IrcelineApiError:
# if it fails twice, just set None and go to the next
result[(feature, day + timedelta(days=d))] = FeatureValue(value=None, timestamp=day)
result[(feature, timestamp + timedelta(days=d))] = FeatureValue(value=None, timestamp=timestamp)
continue
result[(feature, ts + timedelta(days=d))] = FeatureValue(

View file

@ -6,7 +6,11 @@ https://www.irceline.be/en/air-quality/measurements/belaqi-air-quality-index/inf
> are applied to the latest hourly mean O3 and NO2 concentrations and the running 24-hourly mean PM2.5 and PM10
> concentrations.
"""
from src.open_irceline.data import BelAqiIndex
from datetime import datetime, date
from typing import Tuple, Dict
from src.open_irceline.api import IrcelineRioClient, IrcelineForecastClient
from src.open_irceline.data import BelAqiIndex, RioFeature, ForecastFeature
def belaqi_index(pm10: float, pm25: float, o3: float, no2: float) -> BelAqiIndex:
@ -52,3 +56,56 @@ def belaqi_index(pm10: float, pm25: float, o3: float, no2: float) -> BelAqiIndex
elif pm10 >= 0 or pm25 >= 0 or o3 >= 0 or no2 >= 0:
return BelAqiIndex.EXCELLENT
async def belaqi_index_actual(rio_client: IrcelineRioClient, position: Tuple[float, float],
timestamp: datetime | None = None) -> BelAqiIndex:
"""
Get current BelAQI index value for the given position using the rio_client
:param rio_client: client for the RIO WFS service
:param position: position for which to get the data
:param timestamp: desired time for the data (now if None)
:return: BelAQI index value for the position at the time
"""
if timestamp is None:
timestamp = datetime.utcnow()
components = await rio_client.get_data(
timestamp=timestamp,
features=[RioFeature.PM10_24HMEAN, RioFeature.PM25_24HMEAN, RioFeature.O3_HMEAN, RioFeature.NO2_HMEAN],
position=position
)
return belaqi_index(components[RioFeature.PM10_24HMEAN].get('value', -1),
components[RioFeature.PM25_24HMEAN].get('value', -1),
components[RioFeature.O3_HMEAN].get('value', -1),
components[RioFeature.NO2_HMEAN].get('value', -1))
async def belaqi_index_forecast(forecast_client: IrcelineForecastClient, position: Tuple[float, float],
timestamp: date | None = None) -> Dict[date, BelAqiIndex]:
"""
Get forecasted BelAQI index value for the given position using the forecast_client.
Data is downloaded for the given day and the four next days
:param forecast_client: client for the forecast data
:param position: position for which to get the data
:param timestamp: day at which the forecast are issued
:return: dict mapping a day to the forecasted BelAQI index
"""
if timestamp is None:
timestamp = date.today()
components = await forecast_client.get_data(
timestamp=timestamp,
features=[ForecastFeature.PM10_DMEAN, ForecastFeature.PM25_DMEAN, ForecastFeature.O3_MAXHMEAN,
ForecastFeature.NO2_MAXHMEAN],
position=position
)
result = dict()
for _, day in components.keys():
result[day] = belaqi_index(components[(ForecastFeature.PM10_DMEAN, day)].get('value', -1),
components[(ForecastFeature.PM25_DMEAN, day)].get('value', -1),
components[(ForecastFeature.O3_MAXHMEAN, day)].get('value', -1),
components[(ForecastFeature.NO2_MAXHMEAN, day)].get('value', -1))
return result

View file

@ -23,7 +23,7 @@ async def test_cached_calls():
client = IrcelineForecastClient(session)
_ = await client.get_data(
day=date(2024, 6, 19),
timestamp=date(2024, 6, 19),
features=[ForecastFeature.NO2_MAXHMEAN],
position=(50.45, 4.85)
)
@ -40,7 +40,7 @@ async def test_cached_calls():
session.request.assert_has_calls(calls)
_ = await client.get_data(
day=date(2024, 6, 19),
timestamp=date(2024, 6, 19),
features=[ForecastFeature.NO2_MAXHMEAN],
position=(50.45, 4.85)
)
@ -62,7 +62,7 @@ async def test_missed_cached_calls():
client = IrcelineForecastClient(session)
r = await client.get_data(
day=date(2024, 6, 21),
timestamp=date(2024, 6, 21),
features=[ForecastFeature.NO2_MAXHMEAN],
position=(50.45, 4.85)
)

View file

@ -139,3 +139,5 @@ def test_belaqi_value_error():
with pytest.raises(ValueError):
belaqi_index(1, 0, 12, -8888)
# TODO add more test for the other BelAQI functions