mirror of
https://github.com/jdejaegh/python-irceline.git
synced 2025-06-26 19:35:40 +02:00
Inital support for forecast client
This commit is contained in:
parent
12f45d7df7
commit
0fd34a6581
3 changed files with 61 additions and 4 deletions
|
@ -1,6 +1,6 @@
|
|||
from pyproj import Transformer
|
||||
|
||||
project_transform = Transformer.from_crs('EPSG:4326', 'EPSG:31370', always_xy=True)
|
||||
project_transform = Transformer.from_crs('EPSG:4326', 'EPSG:31370', always_xy=False)
|
||||
rio_wfs_base_url = 'https://geo.irceline.be/rio/wfs'
|
||||
# noinspection HttpUrlsUsage
|
||||
# There is not HTTPS version of this endpoint
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
import asyncio
|
||||
import csv
|
||||
import socket
|
||||
from datetime import datetime, timedelta, date
|
||||
from io import StringIO
|
||||
from itertools import product
|
||||
from typing import Tuple, List, Dict, Set
|
||||
from xml.etree import ElementTree
|
||||
|
||||
|
@ -8,8 +11,8 @@ import aiohttp
|
|||
import async_timeout
|
||||
from aiohttp import ClientResponse
|
||||
|
||||
from . import project_transform, rio_wfs_base_url, user_agent
|
||||
from .data import RioFeature, FeatureValue
|
||||
from . import project_transform, rio_wfs_base_url, user_agent, forecast_base_url
|
||||
from .data import RioFeature, FeatureValue, ForecastFeature
|
||||
from .utils import SizedDict
|
||||
|
||||
|
||||
|
@ -200,3 +203,50 @@ class IrcelineRioClient(IrcelineBaseClient):
|
|||
|
||||
return result
|
||||
|
||||
|
||||
class IrcelineForecastClient(IrcelineBaseClient):
|
||||
"""API client for forecast IRCEL - CELINE open data"""
|
||||
|
||||
@staticmethod
|
||||
def _round_coordinates(x: float, y: float, step=.05):
|
||||
n = 1 / step
|
||||
return round(x * n) / n, round(y * n) / n
|
||||
|
||||
async def get_forecast(self, day: date, features: List[ForecastFeature], position: Tuple[float, float]) -> dict:
|
||||
x, y = self._round_coordinates(position[0], position[1])
|
||||
|
||||
result = dict()
|
||||
|
||||
for feature, d in product(features, range(5)):
|
||||
url = f"{forecast_base_url}/BE_{feature}_{day.strftime('%Y%m%d')}_d{d}.csv"
|
||||
try:
|
||||
r: ClientResponse = await self._api_cached_wrapper(url)
|
||||
ts = day
|
||||
except IrcelineApiError:
|
||||
# retry for the day before
|
||||
yesterday = day - timedelta(days=1)
|
||||
url = f"{forecast_base_url}/BE_{feature}_{yesterday.strftime('%Y%m%d')}_d{d}.csv"
|
||||
try:
|
||||
r: ClientResponse = await self._api_cached_wrapper(url)
|
||||
ts = yesterday
|
||||
except IrcelineApiError:
|
||||
# if it fails twice, just set None and go to the next
|
||||
result[(feature, day + timedelta(days=d))] = FeatureValue(value=None, timestamp=day)
|
||||
continue
|
||||
|
||||
result[(feature, ts + timedelta(days=d))] = FeatureValue(
|
||||
value=self.extract_result_from_csv(x, y, await r.text()),
|
||||
timestamp=ts)
|
||||
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def extract_result_from_csv(x: float, y: float, csv_text: str) -> float | None:
|
||||
f = StringIO(csv_text)
|
||||
for row in csv.reader(f, delimiter=';'):
|
||||
try:
|
||||
if x == float(row[1]) and y == float(row[2]):
|
||||
return float(row[3])
|
||||
except ValueError:
|
||||
continue
|
||||
return None
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
from datetime import datetime, date
|
||||
from enum import StrEnum
|
||||
from typing import TypedDict
|
||||
from datetime import datetime, date
|
||||
|
||||
|
||||
class RioFeature(StrEnum):
|
||||
|
@ -26,6 +26,13 @@ class RioFeature(StrEnum):
|
|||
SO2_HMEAN = 'rio:so2_hmean'
|
||||
|
||||
|
||||
class ForecastFeature(StrEnum):
|
||||
NO2_MAXHMEAN = 'chimere_no2_maxhmean'
|
||||
O3_MAXHMEAN = 'chimere_o3_maxhmean'
|
||||
PM10_DMEAN = 'chimere_pm10_dmean'
|
||||
PM25_DMEAN = 'chimere_pm25_dmean'
|
||||
|
||||
|
||||
class FeatureValue(TypedDict):
|
||||
timestamp: datetime | date
|
||||
value: int | float | None
|
||||
|
|
Loading…
Add table
Reference in a new issue