Merge pull request #5 from jdejaegh/wms_wfs

Use WMS and WFS for all the data
This commit is contained in:
Jules 2024-06-30 17:13:24 +02:00 committed by GitHub
commit c878bf8f81
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 3695 additions and 807 deletions

View file

@ -24,9 +24,8 @@ pip install open-irceline
```python
import aiohttp
import asyncio
from datetime import datetime, date
from open_irceline import IrcelineRioClient, RioFeature, IrcelineForecastClient, ForecastFeature, belaqi_index_rio_hourly
from datetime import datetime
from open_irceline import IrcelineRioClient, IrcelineForecastClient, ForecastFeature, RioFeature
async def get_rio_interpolated_data():
@ -43,12 +42,11 @@ async def get_rio_interpolated_data():
print(f"PM10 {result[RioFeature.PM10_HMEAN]['value']} µg/m³")
async def get_forecast():
async def get_o3_forecast():
"""Get forecast for O3 concentration for Brussels for the next days"""
async with aiohttp.ClientSession() as session:
client = IrcelineForecastClient(session)
result = await client.get_data(
timestamp=date.today(),
features=[ForecastFeature.O3_MAXHMEAN],
position=(50.85, 4.35) # (lat, lon) for Brussels
)
@ -57,28 +55,28 @@ async def get_forecast():
print(f"{feature} {day} {v['value']} µg/m³")
async def get_current_belaqi():
async def get_belaqi_forecast():
"""Get current BelAQI index from RIO interpolated values"""
async with aiohttp.ClientSession() as session:
client = IrcelineRioClient(session)
result = await belaqi_index_rio_hourly(
rio_client=client,
timestamp=datetime.utcnow(), # must be timezone aware
client = IrcelineForecastClient(session)
result = await client.get_data(
features=[ForecastFeature.BELAQI],
position=(50.85, 4.35) # (lat, lon) for Brussels
)
print(f"Current BelAQI index for Brussels: {result.get('value')}")
for (_, day), value in result.items():
print(day, value['value'])
if __name__ == '__main__':
print("RIO interpolated data")
print("\nInterpolated data")
asyncio.run(get_rio_interpolated_data())
print("\nO3 forecast for Brussels")
asyncio.run(get_forecast())
asyncio.run(get_o3_forecast())
print("\nCurrent BelAQI index")
asyncio.run(get_current_belaqi())
print("\nForecast BelAQI index")
asyncio.run(get_belaqi_forecast())
```
## Attribution

View file

@ -1,5 +1,6 @@
from .api import IrcelineRioClient, IrcelineForecastClient, IrcelineApiError
from .belaqi import belaqi_index_rio_hourly, belaqi_index_forecast_daily, belaqi_index_daily, belaqi_index_hourly
from .api import IrcelineApiError
from .rio import IrcelineRioClient
from .forecast import IrcelineForecastClient
from .data import RioFeature, ForecastFeature, FeatureValue, BelAqiIndex
__version__ = '2.0.0'

View file

@ -1,26 +1,21 @@
import asyncio
import csv
import socket
from abc import ABC, abstractmethod
from datetime import datetime, timedelta, date
from io import StringIO
from itertools import product
from typing import Tuple, List, Dict, Set
from xml.etree import ElementTree
from typing import Tuple, List, Set
import aiohttp
import async_timeout
from aiohttp import ClientResponse
from .data import RioFeature, FeatureValue, ForecastFeature, IrcelineFeature
from .utils import SizedDict, epsg_transform, round_coordinates
from .data import IrcelineFeature
from .utils import SizedDict
_rio_wfs_base_url = 'https://geo.irceline.be/wfs'
_forecast_wms_base_url = 'https://geo.irceline.be/forecast/wms'
# noinspection HttpUrlsUsage
# There is not HTTPS version of this endpoint
_forecast_base_url = 'http://ftp.irceline.be/forecast'
_user_agent = 'github.com/jdejaegh/python-irceline'
class IrcelineApiError(Exception):
"""Exception to indicate an API error."""
@ -32,11 +27,14 @@ class IrcelineBaseClient(ABC):
@abstractmethod
async def get_data(self,
timestamp: datetime | date,
features: List[IrcelineFeature],
position: Tuple[float, float]) -> dict:
pass
@abstractmethod
def get_capabilities(self) -> Set[str]:
pass
async def _api_wrapper(self, url: str, querystring: dict = None, headers: dict = None, method: str = 'GET'):
"""
Call the URL with the specified query string. Raises exception for >= 400 response code
@ -48,7 +46,6 @@ class IrcelineBaseClient(ABC):
headers = dict()
if 'User-Agent' not in headers:
headers |= {'User-Agent': _user_agent}
try:
async with async_timeout.timeout(60):
response = await self._session.request(
@ -67,200 +64,4 @@ class IrcelineBaseClient(ABC):
except Exception as exception: # pylint: disable=broad-except
raise IrcelineApiError(f"Something really wrong happened! {exception}") from exception
async def _api_cached_wrapper(self, url: str, method: str = 'GET'):
"""
Call the API but uses cache based on the ETag value to avoid repeated calls for the same ressource
:param url: url to fetch
:param method: HTTP method (default to GET)
:return: response from the client
"""
if url in self._cache:
headers = {"If-None-Match": f'{self._cache.get(url, {}).get("etag")}'}
else:
headers = None
r: ClientResponse = await self._api_wrapper(url, headers=headers, method=method)
if r.status == 304:
return self._cache.get(url, {}).get("response")
elif 'ETag' in r.headers:
self._cache[url] = {'etag': r.headers['ETag'],
'response': r}
return r
class IrcelineRioClient(IrcelineBaseClient):
"""API client for RIO interpolated IRCEL - CELINE open data"""
async def get_data(self,
timestamp: datetime | date,
features: List[RioFeature],
position: Tuple[float, float]
) -> Dict[RioFeature, FeatureValue]:
"""
Call the WFS API to get the interpolated level of RioFeature. Raises exception upon API error
:param timestamp: datetime for which to get the data for
:param features: list of RioFeature to fetch from the API
:param position: decimal degrees pair of coordinates
:return: dict with the response (key is RioFeature, value is FeatureValue with actual value and timestamp)
"""
# Remove one hour/day from timestamp to handle case where the hour just passed but the data is not yet there
# (e.g. 5.01 PM, but the most recent data is for 4.00 PM)
if isinstance(timestamp, datetime):
timestamp = timestamp.replace(microsecond=0, second=0, minute=0) - timedelta(hours=1)
timestamp = timestamp.isoformat()
key = 'timestamp'
elif isinstance(timestamp, date):
timestamp = timestamp - timedelta(days=1)
timestamp = timestamp.isoformat()
key = 'date'
else:
raise IrcelineApiError(f"Wrong parameter type for timestamp: {type(timestamp)}")
lat, lon = epsg_transform(position)
querystring = {"service": "WFS",
"version": "1.3.0",
"request": "GetFeature",
"outputFormat": "application/json",
"typeName": ",".join(features),
"cql_filter":
f"{key}>='{timestamp}'"
f" AND "
f"INTERSECTS(the_geom, POINT ({lat} {lon}))"}
r: ClientResponse = await self._api_wrapper(_rio_wfs_base_url, querystring)
return self._format_result('rio', await r.json(), features)
async def get_rio_capabilities(self) -> Set[str]:
"""
Fetch the list of possible features from the WFS server
:return: set of features available on the WFS server
"""
querystring = {"service": "WFS",
"version": "1.3.0",
"request": "GetCapabilities"}
r: ClientResponse = await self._api_wrapper(_rio_wfs_base_url, querystring)
return self._parse_capabilities(await r.text())
@staticmethod
def _parse_capabilities(xml_string: str) -> Set[str]:
"""
From an XML string obtained with GetCapabilities, generate a set of feature names
:param xml_string: XML string to parse
:return: set of FeatureType Names found in the XML document
"""
try:
root = ElementTree.fromstring(xml_string)
except ElementTree.ParseError:
return set()
# noinspection HttpUrlsUsage
# We never connect to the URL, it is just the namespace in the XML
namespaces = {
'wfs': 'http://www.opengis.net/wfs',
}
path = './/wfs:FeatureTypeList/wfs:FeatureType/wfs:Name'
feature_type_names = {t.text for t in root.findall(path, namespaces)}
return feature_type_names
@staticmethod
def _format_result(prefix: str, data: dict, features: List[RioFeature]) -> dict:
"""
Format the JSON dict returned by the WFS service into a more practical dict to use with only the latest measure
for each feature requested
:param prefix: namespace of the feature (e.g. rio), without the colon
:param data: JSON dict value as returned by the API
:param features: RioFeatures wanted in the final dict
:return: reduced dict, key is RioFeature, value is FeatureValue
"""
if data.get('type', None) != 'FeatureCollection' or not isinstance(data.get('features', None), list):
return dict()
features_api = data.get('features', [])
result = dict()
for f in features_api:
props = f.get('properties', {})
if (f.get('id', None) is None or
props.get('value', None) is None):
continue
if (props.get('timestamp', None) is None and
props.get('date', None) is None):
continue
try:
if 'timestamp' in props.keys():
timestamp = datetime.fromisoformat(props.get('timestamp'))
else:
# Cut last character as the date is written '2024-06-15Z' which is not ISO compliant
timestamp = date.fromisoformat(props.get('date')[:-1])
value = float(props.get('value'))
except (TypeError, ValueError):
continue
name = f"{prefix}:{f.get('id').split('.')[0]}"
if name not in [f'{f}' for f in features]:
continue
if name not in result or result[name]['timestamp'] < timestamp:
result[name] = FeatureValue(timestamp=timestamp, value=value)
return result
class IrcelineForecastClient(IrcelineBaseClient):
"""API client for forecast IRCEL - CELINE open data"""
async def get_data(self,
timestamp: date,
features: List[ForecastFeature],
position: Tuple[float, float]
) -> Dict[Tuple[ForecastFeature, date], FeatureValue]:
"""
Get forecasted concentrations for the given features at the given position. The forecasts are downloaded for
the specified day and the 4 next days as well
:param timestamp: date at which the forecast are computed (generally today). If unavailable, the day before will be
tried as well
:param features: pollutants to get the forecasts for
:param position: (lat, long)
:return: dict where key is (ForecastFeature, date of the forecast) and value is a FeatureValue
"""
x, y = round_coordinates(position[0], position[1])
result = dict()
for feature, d in product(features, range(5)):
url = f"{_forecast_base_url}/BE_{feature}_{timestamp.strftime('%Y%m%d')}_d{d}.csv"
try:
r: ClientResponse = await self._api_cached_wrapper(url)
ts = timestamp
except IrcelineApiError:
# retry for the day before
yesterday = timestamp - timedelta(days=1)
url = f"{_forecast_base_url}/BE_{feature}_{yesterday.strftime('%Y%m%d')}_d{d}.csv"
try:
r: ClientResponse = await self._api_cached_wrapper(url)
ts = yesterday
except IrcelineApiError:
# if it fails twice, just set None and go to the next
result[(feature, timestamp + timedelta(days=d))] = FeatureValue(value=None, timestamp=timestamp)
continue
result[(feature, ts + timedelta(days=d))] = FeatureValue(
value=self.extract_result_from_csv(x, y, await r.text()),
timestamp=ts)
return result
@staticmethod
def extract_result_from_csv(x: float, y: float, csv_text: str) -> float | None:
"""
Find the value of the forecast for the given (x, y) position in the csv text.
x, y should already be rounded to match the positions found in the csv
:param x: latitude (rounded)
:param y: longitude (rounded)
:param csv_text: text of the CSV file
:return: value matching the position if found, else None
"""
f = StringIO(csv_text)
for row in csv.reader(f, delimiter=';'):
try:
if x == float(row[1]) and y == float(row[2]):
return float(row[3])
except (ValueError, IndexError):
continue
return None

View file

@ -1,192 +0,0 @@
"""
Compute the BelAQI index from concentrations of PM10, PM2.5, O3 and NO2, based on
https://www.irceline.be/en/air-quality/measurements/air-quality-index-november-2022/info_nov2022
"""
from datetime import datetime, date
from typing import Tuple, Dict, Final
from .api import IrcelineRioClient, IrcelineForecastClient
from .data import BelAqiIndex, RioFeature, ForecastFeature, FeatureValue
# Ratio values from Figure 2 at
# https://www.irceline.be/en/air-quality/measurements/air-quality-index-november-2022/info_nov2022
NO2_MAX_HMEAN_TO_DMEAN: Final = 1.51
O3_MAX_HMEAN_TO_MAX8HMEAN: Final = 1.10
def belaqi_index_daily(pm10: float, pm25: float, o3: float, no2: float) -> BelAqiIndex:
"""
Computes the daily BelAQI index based on the components
Raise ValueError if a component is < 0
Values taken from Table 1 of
https://www.irceline.be/en/air-quality/measurements/air-quality-index-november-2022/info_nov2022
:param pm10: PM10 daily mean (µg/)
:param pm25: PM2.5 daily mean (µg/)
:param o3: O3 maximum running 8-hour mean (µg/)
:param no2: NO2 daily mean (µg/)
:return: BelAQI index from 1 to 10 (Value of BelAqiIndex enum)
"""
if pm10 is None or pm25 is None or o3 is None or no2 is None:
raise ValueError("All the components should be valued (at lest one is None here)")
if pm10 < 0 or pm25 < 0 or o3 < 0 or no2 < 0:
raise ValueError("All the components should have a positive value")
elif pm10 > 100 or pm25 > 50 or o3 > 220 or no2 > 50:
return BelAqiIndex.HORRIBLE
elif pm10 > 80 or pm25 > 40 or o3 > 190 or no2 > 40:
return BelAqiIndex.VERY_BAD
elif pm10 > 70 or pm25 > 35 or o3 > 160 or no2 > 35:
return BelAqiIndex.BAD
elif pm10 > 60 or pm25 > 25 or o3 > 130 or no2 > 30:
return BelAqiIndex.VERY_POOR
elif pm10 > 45 or pm25 > 15 or o3 > 100 or no2 > 25:
return BelAqiIndex.POOR
elif pm10 > 35 or pm25 > 10 or o3 > 80 or no2 > 20:
return BelAqiIndex.MODERATE
elif pm10 > 25 or pm25 > 7.5 or o3 > 70 or no2 > 15:
return BelAqiIndex.FAIRLY_GOOD
elif pm10 > 15 or pm25 > 5 or o3 > 60 or no2 > 10:
return BelAqiIndex.GOOD
elif pm10 > 5 or pm25 > 2.5 or o3 > 30 or no2 > 5:
return BelAqiIndex.VERY_GOOD
elif pm10 >= 0 or pm25 >= 0 or o3 >= 0 or no2 >= 0:
return BelAqiIndex.EXCELLENT
def belaqi_index_hourly(pm10: float, pm25: float, o3: float, no2: float) -> BelAqiIndex:
"""
Computes the hourly BelAQI index based on the components
Raise ValueError if a component is < 0
Values taken from Table 2 of
https://www.irceline.be/en/air-quality/measurements/air-quality-index-november-2022/info_nov2022
:param pm10: PM10 hourly mean (µg/)
:param pm25: PM2.5 hourly mean (µg/)
:param o3: O3 hourly mean (µg/)
:param no2: NO2 hourly mean (µg/)
:return: BelAQI index from 1 to 10 (Value of BelAqiIndex enum)
"""
if pm10 is None or pm25 is None or o3 is None or no2 is None:
raise ValueError("All the components should be valued (at lest one is None here)")
if pm10 < 0 or pm25 < 0 or o3 < 0 or no2 < 0:
raise ValueError("All the components should have a positive value")
elif pm10 > 140 or pm25 > 75 or o3 > 240 or no2 > 75:
return BelAqiIndex.HORRIBLE
elif pm10 > 110 or pm25 > 60 or o3 > 210 or no2 > 60:
return BelAqiIndex.VERY_BAD
elif pm10 > 95 or pm25 > 50 or o3 > 180 or no2 > 50:
return BelAqiIndex.BAD
elif pm10 > 80 or pm25 > 35 or o3 > 150 or no2 > 45:
return BelAqiIndex.VERY_POOR
elif pm10 > 60 or pm25 > 20 or o3 > 110 or no2 > 40:
return BelAqiIndex.POOR
elif pm10 > 45 or pm25 > 15 or o3 > 90 or no2 > 30:
return BelAqiIndex.MODERATE
elif pm10 > 35 or pm25 > 10 or o3 > 75 or no2 > 20:
return BelAqiIndex.FAIRLY_GOOD
elif pm10 > 20 or pm25 > 7.5 or o3 > 65 or no2 > 15:
return BelAqiIndex.GOOD
elif pm10 > 10 or pm25 > 3.5 or o3 > 30 or no2 > 10:
return BelAqiIndex.VERY_GOOD
elif pm10 >= 0 or pm25 >= 0 or o3 >= 0 or no2 >= 0:
return BelAqiIndex.EXCELLENT
async def belaqi_index_rio_hourly(rio_client: IrcelineRioClient, position: Tuple[float, float],
timestamp: datetime | None = None) -> FeatureValue:
"""
Get current BelAQI index value for the given position using the rio_client
Raise ValueError if one or more components are not available
:param rio_client: client for the RIO WFS service
:param position: position for which to get the data
:param timestamp: desired time for the data (now if None)
:return: BelAQI index value for the position at the time
"""
if timestamp is None:
timestamp = datetime.utcnow()
features = [RioFeature.PM10_HMEAN, RioFeature.PM25_HMEAN, RioFeature.O3_HMEAN, RioFeature.NO2_HMEAN]
components = await rio_client.get_data(
timestamp=timestamp,
features=features,
position=position
)
ts = min([components.get(f, {}).get('timestamp') for f in features
if components.get(f, {}).get('timestamp') is not None])
belaqi = belaqi_index_hourly(
pm10=components.get(RioFeature.PM10_HMEAN, {}).get('value', -1),
pm25=components.get(RioFeature.PM25_HMEAN, {}).get('value', -1),
o3=components.get(RioFeature.O3_HMEAN, {}).get('value', -1),
no2=components.get(RioFeature.NO2_HMEAN, {}).get('value', -1)
)
return FeatureValue(timestamp=ts, value=belaqi)
async def belaqi_index_forecast_daily(forecast_client: IrcelineForecastClient, position: Tuple[float, float],
timestamp: date | None = None) -> Dict[date, FeatureValue]:
"""
Get forecasted BelAQI index value for the given position using the forecast_client.
Data is downloaded for the given day and the four next days
Value is None for the date if one or more components cannot be downloaded
:param forecast_client: client for the forecast data
:param position: position for which to get the data
:param timestamp: day at which the forecast are issued
:return: dict mapping a day to the forecasted BelAQI index
"""
if timestamp is None:
timestamp = date.today()
components = await forecast_client.get_data(
timestamp=timestamp,
features=[ForecastFeature.PM10_DMEAN,
ForecastFeature.PM25_DMEAN,
ForecastFeature.O3_MAXHMEAN,
ForecastFeature.NO2_MAXHMEAN],
position=position
)
result = dict()
days = {day for _, day in components.keys()}
timestamps = {v.get('timestamp') for v in components.values() if v.get('timestamp') is not None}
timestamp = min(timestamps)
for day in days:
try:
belaqi = belaqi_index_daily(
pm10=components.get((ForecastFeature.PM10_DMEAN, day), {}).get('value', -1),
pm25=components.get((ForecastFeature.PM25_DMEAN, day), {}).get('value', -1),
o3=components.get((ForecastFeature.O3_MAXHMEAN, day), {}).get('value', -1) * O3_MAX_HMEAN_TO_MAX8HMEAN,
no2=components.get((ForecastFeature.NO2_MAXHMEAN, day), {}).get('value', -1) * NO2_MAX_HMEAN_TO_DMEAN
)
result[day] = FeatureValue(timestamp=timestamp, value=belaqi)
except (ValueError, TypeError):
result[day] = FeatureValue(timestamp=timestamp, value=None)
return result

View file

@ -31,10 +31,13 @@ class RioFeature(IrcelineFeature):
class ForecastFeature(IrcelineFeature):
NO2_MAXHMEAN = 'chimere_no2_maxhmean'
O3_MAXHMEAN = 'chimere_o3_maxhmean'
PM10_DMEAN = 'chimere_pm10_dmean'
PM25_DMEAN = 'chimere_pm25_dmean'
NO2_MAXHMEAN = 'forecast:no2_maxhmean'
NO2_DMEAN = 'forecast:no2_dmean'
O3_MAXHMEAN = 'forecast:o3_maxhmean'
O3_MAX8HMEAN = 'forecast:o3_max8hmean'
PM10_DMEAN = 'forecast:pm10_dmean'
PM25_DMEAN = 'forecast:pm25_dmean'
BELAQI = 'forecast:belaqi'
class BelAqiIndex(Enum):
@ -52,5 +55,5 @@ class BelAqiIndex(Enum):
class FeatureValue(TypedDict):
# Timestamp at which the value was computed
timestamp: datetime | date
timestamp: datetime | date | None
value: int | float | BelAqiIndex | None

View file

@ -0,0 +1,75 @@
from datetime import date, timedelta, datetime
from itertools import product
from typing import List, Tuple, Dict, Set
from xml.etree import ElementTree
from aiohttp import ClientResponse, ClientResponseError
from .api import IrcelineBaseClient, _forecast_wms_base_url, IrcelineApiError
from .data import ForecastFeature, FeatureValue
class IrcelineForecastClient(IrcelineBaseClient):
_epsilon = 0.00001
async def get_data(self,
features: List[ForecastFeature],
position: Tuple[float, float]
) -> Dict[Tuple[ForecastFeature, date], FeatureValue]:
"""
Get forecasted concentrations for the given features at the given position. The forecasts are downloaded for
the specified day and the 3 next days as well
:param features: pollutants to get the forecasts for
:param position: (lat, long)
:return: dict where key is (ForecastFeature, date of the forecast) and value is a FeatureValue
"""
timestamp = date.today()
result = dict()
lat, lon = position
base_querystring = {"service": "WMS",
"version": "1.1.1",
"request": "GetFeatureInfo",
"info_format": "application/json",
"width": "1",
"height": "1",
"srs": "EPSG:4326",
"bbox": f"{lon},{lat},{lon + self._epsilon},{lat + self._epsilon}",
"X": "1",
"Y": "1"}
for feature, d in product(features, range(4)):
querystring = base_querystring | {"layers": f"{feature}_d{d}",
"query_layers": f"{feature}_d{d}"}
try:
r: ClientResponse = await self._api_wrapper(_forecast_wms_base_url, querystring)
r: dict = await r.json()
result[(feature, timestamp + timedelta(days=d))] = FeatureValue(
value=r.get('features', [{}])[0].get('properties', {}).get('GRAY_INDEX'),
timestamp=datetime.fromisoformat(r.get('timeStamp')) if 'timeStamp' in r else None)
except (IrcelineApiError, ClientResponseError, IndexError):
result[(feature, timestamp + timedelta(days=d))] = FeatureValue(value=None, timestamp=None)
return result
async def get_capabilities(self) -> Set[str]:
"""
Fetch the list of possible features from the WMS server
:return: set of features available on the WMS server
"""
querystring = {"service": "WMS",
"version": "1.1.1",
"request": "GetCapabilities"}
r: ClientResponse = await self._api_wrapper(_forecast_wms_base_url, querystring)
return self._parse_capabilities(await r.text())
@staticmethod
def _parse_capabilities(xml_string: str) -> Set[str]:
try:
root = ElementTree.fromstring(xml_string)
except ElementTree.ParseError:
return set()
path = './/Capability/Layer/Layer/Name'
feature_type_names = {t.text for t in root.findall(path)}
return feature_type_names

126
src/open_irceline/rio.py Normal file
View file

@ -0,0 +1,126 @@
from datetime import datetime, date, UTC, timedelta
from typing import List, Tuple, Dict, Set
from xml.etree import ElementTree
from aiohttp import ClientResponse
from .api import IrcelineBaseClient, _rio_wfs_base_url, IrcelineApiError
from .data import RioFeature, FeatureValue
from .utils import epsg_transform
class IrcelineRioClient(IrcelineBaseClient):
"""API client for RIO interpolated IRCEL - CELINE open data"""
async def get_data(self,
features: List[RioFeature],
position: Tuple[float, float],
timestamp: datetime | date | None = None
) -> Dict[RioFeature, FeatureValue]:
"""
Call the WFS API to get the interpolated level of RioFeature. Raises exception upon API error
:param timestamp: datetime for which to get the data for
:param features: list of RioFeature to fetch from the API
:param position: decimal degrees pair of coordinates
:return: dict with the response (key is RioFeature, value is FeatureValue with actual value and timestamp)
"""
if timestamp is None:
timestamp = datetime.now(UTC)
# Remove one hour/day from timestamp to handle case where the hour just passed but the data is not yet there
# (e.g. 5.01 PM, but the most recent data is for 4.00 PM)
if isinstance(timestamp, datetime):
timestamp = timestamp.replace(microsecond=0, second=0, minute=0) - timedelta(hours=1)
timestamp = timestamp.isoformat()
key = 'timestamp'
elif isinstance(timestamp, date):
timestamp = timestamp - timedelta(days=1)
timestamp = timestamp.isoformat()
key = 'date'
else:
raise IrcelineApiError(f"Wrong parameter type for timestamp: {type(timestamp)}")
lat, lon = epsg_transform(position)
querystring = {"service": "WFS",
"version": "1.3.0",
"request": "GetFeature",
"outputFormat": "application/json",
"typeName": ",".join(features),
"cql_filter":
f"{key}>='{timestamp}'"
f" AND "
f"INTERSECTS(the_geom, POINT ({lat} {lon}))"}
r: ClientResponse = await self._api_wrapper(_rio_wfs_base_url, querystring)
return self._format_result('rio', await r.json(), features)
async def get_capabilities(self) -> Set[str]:
"""
Fetch the list of possible features from the WFS server
:return: set of features available on the WFS server
"""
querystring = {"service": "WFS",
"version": "1.3.0",
"request": "GetCapabilities"}
r: ClientResponse = await self._api_wrapper(_rio_wfs_base_url, querystring)
return self._parse_capabilities(await r.text())
@staticmethod
def _parse_capabilities(xml_string: str) -> Set[str]:
"""
From an XML string obtained with GetCapabilities, generate a set of feature names
:param xml_string: XML string to parse
:return: set of FeatureType Names found in the XML document
"""
try:
root = ElementTree.fromstring(xml_string)
except ElementTree.ParseError:
return set()
# noinspection HttpUrlsUsage
# We never connect to the URL, it is just the namespace in the XML
namespaces = {
'wfs': 'http://www.opengis.net/wfs',
}
path = './/wfs:FeatureTypeList/wfs:FeatureType/wfs:Name'
feature_type_names = {t.text for t in root.findall(path, namespaces)}
return feature_type_names
@staticmethod
def _format_result(prefix: str, data: dict, features: List[RioFeature]) -> dict:
"""
Format the JSON dict returned by the WFS service into a more practical dict to use with only the latest measure
for each feature requested
:param prefix: namespace of the feature (e.g. rio), without the colon
:param data: JSON dict value as returned by the API
:param features: RioFeatures wanted in the final dict
:return: reduced dict, key is RioFeature, value is FeatureValue
"""
if data.get('type', None) != 'FeatureCollection' or not isinstance(data.get('features', None), list):
return dict()
features_api = data.get('features', [])
result = dict()
for f in features_api:
props = f.get('properties', {})
if (f.get('id', None) is None or
props.get('value', None) is None):
continue
if (props.get('timestamp', None) is None and
props.get('date', None) is None):
continue
try:
if 'timestamp' in props.keys():
timestamp = datetime.fromisoformat(props.get('timestamp'))
else:
# Cut last character as the date is written '2024-06-15Z' which is not ISO compliant
timestamp = date.fromisoformat(props.get('date')[:-1])
value = float(props.get('value'))
except (TypeError, ValueError):
continue
name = f"{prefix}:{f.get('id').split('.')[0]}"
if name not in [f'{f}' for f in features]:
continue
if name not in result or result[name]['timestamp'] < timestamp:
result[name] = FeatureValue(timestamp=timestamp, value=value)
return result

View file

@ -27,28 +27,3 @@ def get_mock_session(json_file=None, text_file=None):
mock_session.request = AsyncMock(return_value=mock_response)
return mock_session
def create_mock_response(*args, **kwargs):
etag = 'my-etag-here'
mock_response = Mock()
if '20240619' not in kwargs.get('url', ''):
mock_response.status = 404
mock_response.raise_for_status = Mock(side_effect=aiohttp.ClientResponseError(Mock(), Mock()))
elif etag in kwargs.get('headers', {}).get('If-None-Match', ''):
mock_response.text = AsyncMock(return_value='')
mock_response.status = 304
else:
mock_response.text = AsyncMock(return_value=get_api_data('forecast.csv', plain=True))
mock_response.status = 200
if '20240619' in kwargs.get('url', ''):
mock_response.headers = {'ETag': etag}
else:
mock_response.headers = dict()
return mock_response
def get_mock_session_many_csv():
mock_session = Mock(aiohttp.ClientSession)
mock_session.request = AsyncMock(side_effect=create_mock_response)
return mock_session

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,17 @@
{
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"id": "",
"geometry": null,
"properties": {
"GRAY_INDEX": 10.853286743164062
}
}
],
"totalFeatures": "unknown",
"numberReturned": 1,
"timeStamp": "2024-06-30T13:00:21.520Z",
"crs": null
}

View file

@ -0,0 +1,17 @@
{
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"id": "",
"geometry": null,
"properties": {
"WRONG": 10.853286743164062
}
}
],
"totalFeatures": "unknown",
"numberReturned": 1,
"timeStamp": "2024-06-30T13:00:21.520Z",
"crs": null
}

View file

@ -0,0 +1,8 @@
{
"type": "FeatureCollection",
"features": [],
"totalFeatures": "unknown",
"numberReturned": 1,
"timeStamp": "2024-06-30T13:00:21.520Z",
"crs": null
}

125
tests/test_api_forecast.py Normal file
View file

@ -0,0 +1,125 @@
from datetime import datetime
from itertools import product
from unittest.mock import call
from freezegun import freeze_time
from src.open_irceline import IrcelineForecastClient, ForecastFeature, FeatureValue
from src.open_irceline.api import _user_agent, _forecast_wms_base_url
from tests.conftest import get_api_data, get_mock_session
def test_parse_capabilities():
data = get_api_data('forecast_wms_capabilities.xml', plain=True)
result = IrcelineForecastClient._parse_capabilities(data)
expected = {'o3_maxhmean_wl_d3', 'pm25_dmean_wl_d0', 'o3_max8hmean_chimv2022_d2', 'no2_maxhmean_tf_d2',
'belaqi_forecast_chimv2022_d2', 'pm25_dmean_chimv2022_d3', 'pm10_dmean_chimv2022_d0',
'no2_maxhmean_wl_d0', 'no2_maxhmean_d2', 'no2_dmean_chimv2022_d2', 'o3_maxhmean_chimv2022_d3',
'pm25_dmean_wl_d3', 'o3_maxhmean_chimv2022_d0', 'pm25_dmean', 'pm25_dmean_tf_d0', 'no2_dmean_wl_d2',
'o3_max8hmean_chimv2022_d3', 'pm25_dmean_d2', 'o3_max8hmean_chimv2022_d0', 'o3_maxhmean_wl_d2',
'no2_maxhmean_wl_d1', 'pm10_dmean_tf_d2', 'pm25_dmean_d1', 'o3_maxhmean_chimv2022_d2',
'pm10_dmean_chimv2022_d2', 'o3_maxhmean_vl', 'belaqi_wl_d2', 'pm10_dmean_wl', 'pm10_dmean_d2',
'no2_dmean_wl_d0', 'no2_dmean_d1', 'o3_maxhmean_d2', 'o3_maxhmean_wl', 'pm25_dmean_wl_d2',
'o3_maxhmean_d3', 'o3_max8hmean_wl_d3', 'belaqi_d0', 'no2_maxhmean_wl_d2', 'no2_maxhmean_wl',
'pm10_dmean_wl_d1', 'no2_dmean_chimv2022_d3', 'o3_maxhmean_tf_d1', 'pm25_dmean_vl', 'pm10_dmean_d0',
'o3_max8hmean_d0', 'o3_max8hmean_d2', 'no2_maxhmean_vl', 'o3_max8hmean_chimv2022_d1', 'pm10_dmean',
'pm10_dmean_wl_d2', 'euaqi_d3', 'belaqi_d1', 'o3_max8hmean_d1', 'o3_maxhmean_chimv2022_d1', 'belaqi_vl',
'belaqi_wl_d0', 'no2_dmean_chimv2022_d0', 'pm25_dmean_wl_d1', 'pm25_dmean_tf_d2', 'no2_dmean_d2',
'o3_maxhmean', 'belaqi_wl', 'no2_maxhmean_d0', 'no2_maxhmean_d3', 'o3_max8hmean_d3', 'euaqi_forecast',
'o3_max8hmean_wl_d1', 'pm10_dmean_chimv2022_d3', 'no2_maxhmean_wl_d3', 'o3_maxhmean_d1',
'no2_dmean_wl_d1', 'o3_maxhmean_wl_d1', 'no2_dmean_d3', 'belaqi_d3', 'belaqi', 'pm25_dmean_d3',
'belaqi_forecast', 'no2_dmean_d0', 'pm25_dmean_chimv2022_d1', 'belaqi_wl_d1', 'pm10_dmean_d3',
'no2_dmean_wl_d3', 'pm25_dmean_tf_d1', 'euaqi_d0', 'o3_maxhmean_wl_d0', 'belaqi_forecast_chimv2022_d3',
'no2_dmean_chimv2022_d1', 'o3_max8hmean_wl_d0', 'o3_max8hmean_wl_d2', 'pm10_dmean_chimv2022_d1',
'pm10_dmean_wl_d3', 'pm25_dmean_wl', 'belaqi_forecast_chimv2022_d1', 'euaqi_d2', 'pm10_dmean_d1',
'belaqi_wl_d3', 'belaqi_forecast_chimv2022_d0', 'o3_maxhmean_tf_d0', 'euaqi_d1', 'no2_maxhmean',
'pm25_dmean_chimv2022_d2', 'belaqi_d2', 'pm25_dmean_d0', 'no2_maxhmean_tf_d0', 'pm10_dmean_tf_d0',
'pm25_dmean_chimv2022_d0', 'o3_maxhmean_d0', 'pm10_dmean_tf_d1', 'pm10_dmean_vl', 'no2_maxhmean_tf_d1',
'o3_maxhmean_tf_d2', 'pm10_dmean_wl_d0', 'no2_maxhmean_d1'}
assert result == expected
for f, d in product(ForecastFeature, range(4)):
assert f"{f.split(':')[1]}_d{d}" in result
async def test_aget_capabilities():
session = get_mock_session(text_file='forecast_wms_capabilities.xml')
client = IrcelineForecastClient(session)
_ = await client.get_capabilities()
session.request.assert_called_once_with(
method='GET',
url=_forecast_wms_base_url,
params={"service": "WMS",
"version": "1.1.1",
"request": "GetCapabilities"},
headers={'User-Agent': _user_agent}
)
@freeze_time(datetime.fromisoformat("2024-06-30T13:00:21.520Z"))
async def test_api_forecast_error():
pos = (50.4657, 4.8647)
session = get_mock_session('forecast_wms_feature_info_invalid.json')
client = IrcelineForecastClient(session)
features = [ForecastFeature.NO2_DMEAN, ForecastFeature.O3_MAXHMEAN]
result = await client.get_data(features, pos)
for k, v in result.items():
assert v == FeatureValue(timestamp=datetime.fromisoformat("2024-06-30T13:00:21.520Z"), value=None)
async def test_api_forecast_no_field():
pos = (50.4657, 4.8647)
session = get_mock_session('forecast_wms_feature_info_no_field.json')
client = IrcelineForecastClient(session)
features = [ForecastFeature.NO2_DMEAN, ForecastFeature.O3_MAXHMEAN]
result = await client.get_data(features, pos)
for k, v in result.items():
assert v == FeatureValue(timestamp=None, value=None)
async def test_api_forecast():
pos = (50.4657, 4.8647)
lat, lon = pos
session = get_mock_session('forecast_wms_feature_info.json')
client = IrcelineForecastClient(session)
features = [ForecastFeature.NO2_DMEAN, ForecastFeature.O3_MAXHMEAN]
_ = await client.get_data(features, pos)
base = {"service": "WMS",
"version": "1.1.1",
"request": "GetFeatureInfo",
"info_format": "application/json",
"width": "1",
"height": "1",
"srs": "EPSG:4326",
"bbox": f"{lon},{lat},{lon + 0.00001},{lat + 0.00001}",
"X": "1",
"Y": "1"}
calls = [call(
method='GET',
url=_forecast_wms_base_url,
params=base | {"layers": f"{feature}_d{d}",
"query_layers": f"{feature}_d{d}"},
headers={'User-Agent': _user_agent},
)
for feature, d in product(features, range(4))]
session.request.assert_has_calls(calls, any_order=True)
def test_parse_capabilities_with_error():
result = IrcelineForecastClient._parse_capabilities("wow there no valid XML")
assert result == set()

View file

@ -1,90 +0,0 @@
from datetime import date
from unittest.mock import call
from src.open_irceline.api import _forecast_base_url, _user_agent
from src.open_irceline.api import IrcelineForecastClient
from src.open_irceline.data import ForecastFeature
from tests.conftest import get_api_data, get_mock_session_many_csv
def test_extract_from_csv():
data = get_api_data('forecast.csv', plain=True)
x, y = 50.45, 4.85
result = IrcelineForecastClient.extract_result_from_csv(x, y, data)
assert result == 13.0844
result = IrcelineForecastClient.extract_result_from_csv(23, 4, data)
assert result is None
async def test_cached_calls():
session = get_mock_session_many_csv()
client = IrcelineForecastClient(session)
_ = await client.get_data(
timestamp=date(2024, 6, 19),
features=[ForecastFeature.NO2_MAXHMEAN],
position=(50.45, 4.85)
)
calls = [
call(method='GET',
url=f"{_forecast_base_url}/BE_{ForecastFeature.NO2_MAXHMEAN}_20240619_d{i}.csv",
params=None,
headers={'User-Agent': _user_agent}
) for i in range(5)
]
assert session.request.call_count == 5
session.request.assert_has_calls(calls)
_ = await client.get_data(
timestamp=date(2024, 6, 19),
features=[ForecastFeature.NO2_MAXHMEAN],
position=(50.45, 4.85)
)
calls += [
call(method='GET',
url=f"{_forecast_base_url}/BE_{ForecastFeature.NO2_MAXHMEAN}_20240619_d{i}.csv",
params=None,
headers={'User-Agent': _user_agent, 'If-None-Match': 'my-etag-here'}
) for i in range(5)
]
assert session.request.call_count == 10
session.request.assert_has_calls(calls)
async def test_missed_cached_calls():
session = get_mock_session_many_csv()
client = IrcelineForecastClient(session)
r = await client.get_data(
timestamp=date(2024, 6, 21),
features=[ForecastFeature.NO2_MAXHMEAN],
position=(50.45, 4.85)
)
calls = list()
for i in range(5):
calls += [
call(method='GET',
url=f"{_forecast_base_url}/BE_{ForecastFeature.NO2_MAXHMEAN}_20240621_d{i}.csv",
params=None,
headers={'User-Agent': _user_agent}
),
call(method='GET',
url=f"{_forecast_base_url}/BE_{ForecastFeature.NO2_MAXHMEAN}_20240620_d{i}.csv",
params=None,
headers={'User-Agent': _user_agent}
)
]
assert session.request.call_count == 10
session.request.assert_has_calls(calls)
for value in r.values():
assert value['value'] is None

View file

@ -2,7 +2,7 @@ from datetime import datetime, date
from freezegun import freeze_time
from src.open_irceline.api import IrcelineRioClient
from src.open_irceline import IrcelineRioClient
from src.open_irceline.api import _rio_wfs_base_url, _user_agent
from src.open_irceline.data import RioFeature, FeatureValue
from src.open_irceline.utils import epsg_transform
@ -99,7 +99,7 @@ async def test_api_rio():
d = date(2024, 6, 18)
features = [RioFeature.NO2_HMEAN, RioFeature.O3_HMEAN]
_ = await client.get_data(d, features, pos)
_ = await client.get_data(features, pos, d)
session.request.assert_called_once_with(
method='GET',
url=_rio_wfs_base_url,
@ -120,7 +120,7 @@ async def test_api_rio_get_capabilities():
session = get_mock_session(text_file='capabilities.xml')
client = IrcelineRioClient(session)
_ = await client.get_rio_capabilities()
_ = await client.get_capabilities()
session.request.assert_called_once_with(
method='GET',

View file

@ -1,267 +0,0 @@
from datetime import date, timedelta, datetime
import pytest
from freezegun import freeze_time
from src.open_irceline.api import IrcelineForecastClient, IrcelineRioClient
from src.open_irceline.belaqi import belaqi_index_forecast_daily, belaqi_index_rio_hourly, belaqi_index_hourly, \
belaqi_index_daily
from src.open_irceline.data import BelAqiIndex
from tests.conftest import get_mock_session_many_csv, get_mock_session
@pytest.mark.parametrize("pm10, pm25, o3, no2, expected", [
(5, 2, 25, 5, BelAqiIndex.EXCELLENT),
(15, 5, 50, 12, BelAqiIndex.VERY_GOOD),
(30, 9, 70, 18, BelAqiIndex.GOOD),
(40, 13, 80, 25, BelAqiIndex.FAIRLY_GOOD),
(55, 18, 100, 35, BelAqiIndex.MODERATE),
(70, 25, 130, 43, BelAqiIndex.POOR),
(90, 45, 160, 48, BelAqiIndex.VERY_POOR),
(100, 55, 200, 55, BelAqiIndex.BAD),
(130, 70, 230, 70, BelAqiIndex.VERY_BAD),
(150, 80, 250, 80, BelAqiIndex.HORRIBLE),
(150, 80, 300, 80, BelAqiIndex.HORRIBLE),
(95, 5, 25, 5, BelAqiIndex.VERY_POOR),
(145, 5, 25, 5, BelAqiIndex.HORRIBLE),
(5, 55, 25, 5, BelAqiIndex.BAD),
(5, 85, 25, 5, BelAqiIndex.HORRIBLE),
(5, 5, 190, 5, BelAqiIndex.BAD),
(5, 5, 260, 5, BelAqiIndex.HORRIBLE),
(5, 5, 25, 65, BelAqiIndex.VERY_BAD),
(5, 5, 25, 85, BelAqiIndex.HORRIBLE),
(45, 15, 150, 10, BelAqiIndex.POOR),
(20, 25, 180, 15, BelAqiIndex.VERY_POOR),
(10, 7, 250, 70, BelAqiIndex.HORRIBLE),
(110, 3, 30, 25, BelAqiIndex.BAD),
(5, 0, 0, 0, BelAqiIndex.EXCELLENT),
(15, 0, 0, 0, BelAqiIndex.VERY_GOOD),
(30, 0, 0, 0, BelAqiIndex.GOOD),
(40, 0, 0, 0, BelAqiIndex.FAIRLY_GOOD),
(55, 0, 0, 0, BelAqiIndex.MODERATE),
(70, 0, 0, 0, BelAqiIndex.POOR),
(90, 0, 0, 0, BelAqiIndex.VERY_POOR),
(100, 0, 0, 0, BelAqiIndex.BAD),
(130, 0, 0, 0, BelAqiIndex.VERY_BAD),
(150, 0, 0, 0, BelAqiIndex.HORRIBLE),
(0, 2, 0, 0, BelAqiIndex.EXCELLENT),
(0, 5, 0, 0, BelAqiIndex.VERY_GOOD),
(0, 9, 0, 0, BelAqiIndex.GOOD),
(0, 13, 0, 0, BelAqiIndex.FAIRLY_GOOD),
(0, 18, 0, 0, BelAqiIndex.MODERATE),
(0, 25, 0, 0, BelAqiIndex.POOR),
(0, 45, 0, 0, BelAqiIndex.VERY_POOR),
(0, 55, 0, 0, BelAqiIndex.BAD),
(0, 70, 0, 0, BelAqiIndex.VERY_BAD),
(0, 80, 0, 0, BelAqiIndex.HORRIBLE),
(0, 0, 25, 0, BelAqiIndex.EXCELLENT),
(0, 0, 50, 0, BelAqiIndex.VERY_GOOD),
(0, 0, 70, 0, BelAqiIndex.GOOD),
(0, 0, 80, 0, BelAqiIndex.FAIRLY_GOOD),
(0, 0, 100, 0, BelAqiIndex.MODERATE),
(0, 0, 130, 0, BelAqiIndex.POOR),
(0, 0, 160, 0, BelAqiIndex.VERY_POOR),
(0, 0, 200, 0, BelAqiIndex.BAD),
(0, 0, 230, 0, BelAqiIndex.VERY_BAD),
(0, 0, 250, 0, BelAqiIndex.HORRIBLE),
(0, 0, 0, 5, BelAqiIndex.EXCELLENT),
(0, 0, 0, 12, BelAqiIndex.VERY_GOOD),
(0, 0, 0, 18, BelAqiIndex.GOOD),
(0, 0, 0, 25, BelAqiIndex.FAIRLY_GOOD),
(0, 0, 0, 35, BelAqiIndex.MODERATE),
(0, 0, 0, 43, BelAqiIndex.POOR),
(0, 0, 0, 48, BelAqiIndex.VERY_POOR),
(0, 0, 0, 55, BelAqiIndex.BAD),
(0, 0, 0, 70, BelAqiIndex.VERY_BAD),
(0, 0, 0, 80, BelAqiIndex.HORRIBLE)
])
def test_belaqi_index_hourly(pm10, pm25, o3, no2, expected):
assert belaqi_index_hourly(pm10, pm25, o3, no2) == expected
@pytest.mark.parametrize("pm10, pm25, o3, no2, expected_index", [
(5, 0, 0, 0, BelAqiIndex.EXCELLENT),
(15, 0, 0, 0, BelAqiIndex.VERY_GOOD),
(25, 0, 0, 0, BelAqiIndex.GOOD),
(35, 0, 0, 0, BelAqiIndex.FAIRLY_GOOD),
(45, 0, 0, 0, BelAqiIndex.MODERATE),
(60, 0, 0, 0, BelAqiIndex.POOR),
(70, 0, 0, 0, BelAqiIndex.VERY_POOR),
(80, 0, 0, 0, BelAqiIndex.BAD),
(100, 0, 0, 0, BelAqiIndex.VERY_BAD),
(101, 0, 0, 0, BelAqiIndex.HORRIBLE),
(0, 2.5, 0, 0, BelAqiIndex.EXCELLENT),
(0, 5, 0, 0, BelAqiIndex.VERY_GOOD),
(0, 7.5, 0, 0, BelAqiIndex.GOOD),
(0, 10, 0, 0, BelAqiIndex.FAIRLY_GOOD),
(0, 15, 0, 0, BelAqiIndex.MODERATE),
(0, 25, 0, 0, BelAqiIndex.POOR),
(0, 35, 0, 0, BelAqiIndex.VERY_POOR),
(0, 40, 0, 0, BelAqiIndex.BAD),
(0, 50, 0, 0, BelAqiIndex.VERY_BAD),
(0, 51, 0, 0, BelAqiIndex.HORRIBLE),
(0, 0, 30, 0, BelAqiIndex.EXCELLENT),
(0, 0, 60, 0, BelAqiIndex.VERY_GOOD),
(0, 0, 70, 0, BelAqiIndex.GOOD),
(0, 0, 80, 0, BelAqiIndex.FAIRLY_GOOD),
(0, 0, 100, 0, BelAqiIndex.MODERATE),
(0, 0, 130, 0, BelAqiIndex.POOR),
(0, 0, 160, 0, BelAqiIndex.VERY_POOR),
(0, 0, 190, 0, BelAqiIndex.BAD),
(0, 0, 220, 0, BelAqiIndex.VERY_BAD),
(0, 0, 221, 0, BelAqiIndex.HORRIBLE),
(0, 0, 0, 5, BelAqiIndex.EXCELLENT),
(0, 0, 0, 10, BelAqiIndex.VERY_GOOD),
(0, 0, 0, 15, BelAqiIndex.GOOD),
(0, 0, 0, 20, BelAqiIndex.FAIRLY_GOOD),
(0, 0, 0, 25, BelAqiIndex.MODERATE),
(0, 0, 0, 30, BelAqiIndex.POOR),
(0, 0, 0, 35, BelAqiIndex.VERY_POOR),
(0, 0, 0, 40, BelAqiIndex.BAD),
(0, 0, 0, 50, BelAqiIndex.VERY_BAD),
(0, 0, 0, 51, BelAqiIndex.HORRIBLE),
(3, 1, 20, 4, BelAqiIndex.EXCELLENT),
(10, 3, 50, 8, BelAqiIndex.VERY_GOOD),
(20, 6, 65, 12, BelAqiIndex.GOOD),
(30, 8, 75, 18, BelAqiIndex.FAIRLY_GOOD),
(40, 12, 90, 22, BelAqiIndex.MODERATE),
(50, 20, 110, 28, BelAqiIndex.POOR),
(65, 30, 140, 33, BelAqiIndex.VERY_POOR),
(75, 38, 180, 38, BelAqiIndex.BAD),
(90, 45, 200, 45, BelAqiIndex.VERY_BAD),
(110, 55, 230, 55, BelAqiIndex.HORRIBLE),
(3, 30, 20, 8, BelAqiIndex.VERY_POOR),
(110, 6, 65, 12, BelAqiIndex.HORRIBLE),
(3, 6, 230, 12, BelAqiIndex.HORRIBLE),
(3, 6, 65, 55, BelAqiIndex.HORRIBLE),
(50, 5, 65, 12, BelAqiIndex.POOR),
(10, 20, 65, 12, BelAqiIndex.POOR),
(10, 5, 110, 12, BelAqiIndex.POOR),
(10, 5, 65, 28, BelAqiIndex.POOR),
(75, 5, 30, 8, BelAqiIndex.BAD),
(10, 38, 30, 8, BelAqiIndex.BAD),
(10, 5, 180, 8, BelAqiIndex.BAD),
(10, 5, 30, 38, BelAqiIndex.BAD),
(65, 3, 20, 22, BelAqiIndex.VERY_POOR),
(3, 30, 20, 22, BelAqiIndex.VERY_POOR),
(3, 3, 140, 22, BelAqiIndex.VERY_POOR),
(3, 3, 20, 33, BelAqiIndex.VERY_POOR),
(90, 6, 20, 22, BelAqiIndex.VERY_BAD),
(10, 45, 20, 22, BelAqiIndex.VERY_BAD),
(10, 6, 200, 22, BelAqiIndex.VERY_BAD),
(10, 6, 20, 45, BelAqiIndex.VERY_BAD),
(3, 30, 20, 4, BelAqiIndex.VERY_POOR),
(110, 1, 20, 4, BelAqiIndex.HORRIBLE),
(3, 1, 230, 4, BelAqiIndex.HORRIBLE),
(3, 1, 20, 55, BelAqiIndex.HORRIBLE),
(50, 3, 20, 4, BelAqiIndex.POOR),
(3, 20, 20, 4, BelAqiIndex.POOR),
(3, 1, 110, 4, BelAqiIndex.POOR),
(3, 1, 20, 28, BelAqiIndex.POOR),
])
def test_belaqi_index_daily(pm10, pm25, o3, no2, expected_index):
assert belaqi_index_daily(pm10, pm25, o3, no2) == expected_index
def test_belaqi_hourly_value_error():
with pytest.raises(ValueError):
belaqi_index_hourly(-1, 0, 12, 8)
with pytest.raises(ValueError):
belaqi_index_hourly(1, -20, 12, 8)
with pytest.raises(ValueError):
belaqi_index_hourly(1, 0, -12, 8)
with pytest.raises(ValueError):
belaqi_index_hourly(1, 0, 12, -8888)
def test_belaqi_daily_value_error():
with pytest.raises(ValueError):
belaqi_index_daily(-1, 0, 12, 8)
with pytest.raises(ValueError):
belaqi_index_daily(1, -20, 12, 8)
with pytest.raises(ValueError):
belaqi_index_daily(1, 0, -12, 8)
with pytest.raises(ValueError):
belaqi_index_daily(1, 0, 12, -8888)
def test_belaqi_hourly_value_error_none():
with pytest.raises(ValueError):
belaqi_index_hourly(None, 0, 12, 8)
with pytest.raises(ValueError):
belaqi_index_hourly(1, None, 12, 8)
with pytest.raises(ValueError):
belaqi_index_hourly(1, 0, None, 8)
with pytest.raises(ValueError):
belaqi_index_hourly(1, 0, 12, None)
def test_belaqi_daily_value_error_none():
with pytest.raises(ValueError):
belaqi_index_daily(None, 0, 12, 8)
with pytest.raises(ValueError):
belaqi_index_daily(1, None, 12, 8)
with pytest.raises(ValueError):
belaqi_index_daily(1, 0, None, 8)
with pytest.raises(ValueError):
belaqi_index_daily(1, 0, 12, None)
@freeze_time(datetime.fromisoformat("2024-06-19T19:30:09.581Z"))
async def test_belaqi_index_forecast():
session = get_mock_session_many_csv()
client = IrcelineForecastClient(session)
pos = (50.55, 4.85)
result = await belaqi_index_forecast_daily(client, pos)
expected_days = {date(2024, 6, 19) + timedelta(days=i) for i in range(5)}
assert set(result.keys()) == expected_days
for v in result.values():
assert v.get('value') == BelAqiIndex.MODERATE
async def test_belaqi_index_forecast_missing_day():
session = get_mock_session_many_csv()
client = IrcelineForecastClient(session)
pos = (50.55, 4.85)
result = await belaqi_index_forecast_daily(client, pos, date(2024, 6, 21))
expected_days = {date(2024, 6, 21) + timedelta(days=i) for i in range(5)}
assert set(result.keys()) == expected_days
for v in result.values():
assert v.get('value') is None
@freeze_time(datetime.fromisoformat("2024-06-23T12:30:09.581Z"))
async def test_belaqi_index_actual():
session = get_mock_session(json_file='rio_wfs_for_belaqi.json')
client = IrcelineRioClient(session)
pos = (50.55, 4.85)
result = await belaqi_index_rio_hourly(client, pos)
assert result.get('value') == BelAqiIndex.GOOD
@freeze_time(datetime.fromisoformat("2024-06-23T12:30:09.581Z"))
async def test_belaqi_index_actual_missing_value():
session = get_mock_session(json_file='rio_wfs.json')
client = IrcelineRioClient(session)
pos = (50.55, 4.85)
with pytest.raises(ValueError):
_ = await belaqi_index_rio_hourly(client, pos)