diff --git a/src/open_irceline/api.py b/src/open_irceline/api.py index 7a5a989..4ee3577 100644 --- a/src/open_irceline/api.py +++ b/src/open_irceline/api.py @@ -204,14 +204,14 @@ class IrcelineForecastClient(IrcelineBaseClient): """API client for forecast IRCEL - CELINE open data""" async def get_data(self, - day: date, + timestamp: date, features: List[ForecastFeature], position: Tuple[float, float] ) -> Dict[Tuple[ForecastFeature, date], FeatureValue]: """ Get forecasted concentrations for the given features at the given position. The forecasts are downloaded for the specified day and the 4 next days as well - :param day: date at which the forecast are computed (generally today). If unavailable, the day before will be + :param timestamp: date at which the forecast are computed (generally today). If unavailable, the day before will be tried as well :param features: pollutants to get the forecasts for :param position: (lat, long) @@ -221,13 +221,13 @@ class IrcelineForecastClient(IrcelineBaseClient): result = dict() for feature, d in product(features, range(5)): - url = f"{forecast_base_url}/BE_{feature}_{day.strftime('%Y%m%d')}_d{d}.csv" + url = f"{forecast_base_url}/BE_{feature}_{timestamp.strftime('%Y%m%d')}_d{d}.csv" try: r: ClientResponse = await self._api_cached_wrapper(url) - ts = day + ts = timestamp except IrcelineApiError: # retry for the day before - yesterday = day - timedelta(days=1) + yesterday = timestamp - timedelta(days=1) print('here') url = f"{forecast_base_url}/BE_{feature}_{yesterday.strftime('%Y%m%d')}_d{d}.csv" try: @@ -235,7 +235,7 @@ class IrcelineForecastClient(IrcelineBaseClient): ts = yesterday except IrcelineApiError: # if it fails twice, just set None and go to the next - result[(feature, day + timedelta(days=d))] = FeatureValue(value=None, timestamp=day) + result[(feature, timestamp + timedelta(days=d))] = FeatureValue(value=None, timestamp=timestamp) continue result[(feature, ts + timedelta(days=d))] = FeatureValue( diff --git a/src/open_irceline/belaqi.py b/src/open_irceline/belaqi.py index 8ad50ef..4366ae6 100644 --- a/src/open_irceline/belaqi.py +++ b/src/open_irceline/belaqi.py @@ -6,7 +6,11 @@ https://www.irceline.be/en/air-quality/measurements/belaqi-air-quality-index/inf > are applied to the latest hourly mean O3 and NO2 concentrations and the running 24-hourly mean PM2.5 and PM10 > concentrations. """ -from src.open_irceline.data import BelAqiIndex +from datetime import datetime, date +from typing import Tuple, Dict + +from src.open_irceline.api import IrcelineRioClient, IrcelineForecastClient +from src.open_irceline.data import BelAqiIndex, RioFeature, ForecastFeature def belaqi_index(pm10: float, pm25: float, o3: float, no2: float) -> BelAqiIndex: @@ -52,3 +56,56 @@ def belaqi_index(pm10: float, pm25: float, o3: float, no2: float) -> BelAqiIndex elif pm10 >= 0 or pm25 >= 0 or o3 >= 0 or no2 >= 0: return BelAqiIndex.EXCELLENT + + +async def belaqi_index_actual(rio_client: IrcelineRioClient, position: Tuple[float, float], + timestamp: datetime | None = None) -> BelAqiIndex: + """ + Get current BelAQI index value for the given position using the rio_client + :param rio_client: client for the RIO WFS service + :param position: position for which to get the data + :param timestamp: desired time for the data (now if None) + :return: BelAQI index value for the position at the time + """ + if timestamp is None: + timestamp = datetime.utcnow() + components = await rio_client.get_data( + timestamp=timestamp, + features=[RioFeature.PM10_24HMEAN, RioFeature.PM25_24HMEAN, RioFeature.O3_HMEAN, RioFeature.NO2_HMEAN], + position=position + ) + + return belaqi_index(components[RioFeature.PM10_24HMEAN].get('value', -1), + components[RioFeature.PM25_24HMEAN].get('value', -1), + components[RioFeature.O3_HMEAN].get('value', -1), + components[RioFeature.NO2_HMEAN].get('value', -1)) + + +async def belaqi_index_forecast(forecast_client: IrcelineForecastClient, position: Tuple[float, float], + timestamp: date | None = None) -> Dict[date, BelAqiIndex]: + """ + Get forecasted BelAQI index value for the given position using the forecast_client. + Data is downloaded for the given day and the four next days + :param forecast_client: client for the forecast data + :param position: position for which to get the data + :param timestamp: day at which the forecast are issued + :return: dict mapping a day to the forecasted BelAQI index + """ + if timestamp is None: + timestamp = date.today() + components = await forecast_client.get_data( + timestamp=timestamp, + features=[ForecastFeature.PM10_DMEAN, ForecastFeature.PM25_DMEAN, ForecastFeature.O3_MAXHMEAN, + ForecastFeature.NO2_MAXHMEAN], + position=position + ) + + result = dict() + + for _, day in components.keys(): + result[day] = belaqi_index(components[(ForecastFeature.PM10_DMEAN, day)].get('value', -1), + components[(ForecastFeature.PM25_DMEAN, day)].get('value', -1), + components[(ForecastFeature.O3_MAXHMEAN, day)].get('value', -1), + components[(ForecastFeature.NO2_MAXHMEAN, day)].get('value', -1)) + + return result diff --git a/tests/test_api_forecasts.py b/tests/test_api_forecasts.py index 85eeecf..b339a40 100644 --- a/tests/test_api_forecasts.py +++ b/tests/test_api_forecasts.py @@ -23,7 +23,7 @@ async def test_cached_calls(): client = IrcelineForecastClient(session) _ = await client.get_data( - day=date(2024, 6, 19), + timestamp=date(2024, 6, 19), features=[ForecastFeature.NO2_MAXHMEAN], position=(50.45, 4.85) ) @@ -40,7 +40,7 @@ async def test_cached_calls(): session.request.assert_has_calls(calls) _ = await client.get_data( - day=date(2024, 6, 19), + timestamp=date(2024, 6, 19), features=[ForecastFeature.NO2_MAXHMEAN], position=(50.45, 4.85) ) @@ -62,7 +62,7 @@ async def test_missed_cached_calls(): client = IrcelineForecastClient(session) r = await client.get_data( - day=date(2024, 6, 21), + timestamp=date(2024, 6, 21), features=[ForecastFeature.NO2_MAXHMEAN], position=(50.45, 4.85) ) diff --git a/tests/test_belaqi.py b/tests/test_belaqi.py index f4ffee4..3d4ffde 100644 --- a/tests/test_belaqi.py +++ b/tests/test_belaqi.py @@ -139,3 +139,5 @@ def test_belaqi_value_error(): with pytest.raises(ValueError): belaqi_index(1, 0, 12, -8888) + +# TODO add more test for the other BelAQI functions \ No newline at end of file