372 lines
13 KiB
Python
372 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Fetches weather from *no-key* providers, normalizes units, aggregates to a daily consensus,
|
|
and writes JSON outputs into /var/www/WEATHER/public_html/data/.
|
|
|
|
Providers:
|
|
- Open-Meteo (no key) -> daily temperature_2m_max (C/F) [https://open-meteo.com]
|
|
- api.weather.gov (no key; User-Agent required) -> periods [https://api.weather.gov]
|
|
- MET Norway Locationforecast (no key; User-Agent) -> hourly [https://api.met.no]
|
|
- 7Timer! civillight (no key) -> daily max/min [http://www.7timer.info]
|
|
|
|
Outputs:
|
|
data/raw/<provider>_<YYYY-MM-DD>_<Location>.json (raw)
|
|
data/aggregated_<YYYY-MM-DD>_<Location>.json (aggregated daily consensus)
|
|
data/latest.json (copy of aggregated)
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
from datetime import datetime as dt
|
|
from datetime import timezone
|
|
from collections import defaultdict
|
|
|
|
from zoneinfo import ZoneInfo # stdlib (Python 3.9+)
|
|
import requests
|
|
|
|
# ---------------------- Utilities ----------------------
|
|
|
|
def here(*parts):
|
|
return os.path.join(os.path.dirname(os.path.abspath(__file__)), *parts)
|
|
|
|
def load_config():
|
|
conf_path = here("config.json")
|
|
if not os.path.exists(conf_path):
|
|
raise FileNotFoundError(
|
|
f"Missing config.json at {conf_path}. Copy config.example.json and fill in your values."
|
|
)
|
|
with open(conf_path, "r") as f:
|
|
return json.load(f)
|
|
|
|
def ensure_dirs(output_dir):
|
|
raw_dir = os.path.join(output_dir, "raw")
|
|
os.makedirs(raw_dir, exist_ok=True)
|
|
return raw_dir
|
|
|
|
def date_str_utc():
|
|
return dt.now(timezone.utc).strftime("%Y-%m-%d")
|
|
|
|
def c_to_f(c): return (float(c) * 9.0/5.0) + 32.0
|
|
def f_to_c(f): return (float(f) - 32.0) * 5.0/9.0
|
|
|
|
def normalize_temp(value, target_units, source_units):
|
|
"""Return temp in target_units ('imperial' or 'metric') given source_units."""
|
|
if value is None:
|
|
return None
|
|
if target_units == source_units:
|
|
return float(value)
|
|
if source_units == "metric" and target_units == "imperial":
|
|
return c_to_f(value)
|
|
if source_units == "imperial" and target_units == "metric":
|
|
return f_to_c(value)
|
|
# default passthrough
|
|
return float(value)
|
|
|
|
def iso_to_local_date(iso_ts, tz_name):
|
|
"""
|
|
Convert ISO-8601 string to local YYYY-MM-DD using zoneinfo tz.
|
|
Accepts Z or +00:00. Returns date string.
|
|
"""
|
|
if iso_ts.endswith("Z"):
|
|
iso_ts = iso_ts.replace("Z", "+00:00")
|
|
dt_utc = dt.fromisoformat(iso_ts)
|
|
if dt_utc.tzinfo is None:
|
|
dt_utc = dt_utc.replace(tzinfo=timezone.utc)
|
|
local = dt_utc.astimezone(ZoneInfo(tz_name))
|
|
return local.strftime("%Y-%m-%d")
|
|
|
|
# ---------------------- Providers ----------------------
|
|
|
|
def fetch_open_meteo(provider_conf, lat, lon, days, units, tz_name):
|
|
"""
|
|
Open-Meteo: request daily max temperature. Supports temperature_unit and timezone parameters.
|
|
No API key required.
|
|
"""
|
|
url = provider_conf["base_url"]
|
|
params = {
|
|
"latitude": lat,
|
|
"longitude": lon,
|
|
"daily": "temperature_2m_max",
|
|
"timezone": tz_name or "auto",
|
|
"temperature_unit": "fahrenheit" if units == "imperial" else "celsius"
|
|
}
|
|
r = requests.get(url, params=params, timeout=20)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
|
|
result = []
|
|
daily = (data.get("daily") or {})
|
|
times = daily.get("time") or []
|
|
highs = daily.get("temperature_2m_max") or []
|
|
for t, h in list(zip(times, highs))[:days]:
|
|
if h is None:
|
|
continue
|
|
result.append({
|
|
"date": t, # already aligned to requested timezone
|
|
"high": round(float(h), 1),
|
|
"units": units,
|
|
"provider": "open-meteo"
|
|
})
|
|
return result, data
|
|
|
|
def fetch_nws(provider_conf, lat, lon, days, units, tz_name, user_agent):
|
|
"""
|
|
NWS api.weather.gov:
|
|
1) /points/{lat},{lon} -> get 'forecast' URL
|
|
2) GET forecast -> 'properties.periods' (local time). Temperatures usually in Fahrenheit.
|
|
Requires a User-Agent header.
|
|
"""
|
|
base = provider_conf["base_url"].rstrip("/")
|
|
headers = {"User-Agent": user_agent}
|
|
# Metadata to find forecast URL
|
|
meta_url = f"{base}/points/{round(lat,4)},{round(lon,4)}"
|
|
r_meta = requests.get(meta_url, headers=headers, timeout=20)
|
|
r_meta.raise_for_status()
|
|
meta = r_meta.json()
|
|
|
|
forecast_url = (meta.get("properties") or {}).get("forecast")
|
|
if not forecast_url:
|
|
return [], {"points": meta}
|
|
|
|
r_fcst = requests.get(forecast_url, headers=headers, timeout=20)
|
|
r_fcst.raise_for_status()
|
|
fcst = r_fcst.json()
|
|
|
|
by_date = defaultdict(list)
|
|
for p in (fcst.get("properties") or {}).get("periods", []):
|
|
start = p.get("startTime")
|
|
temp = p.get("temperature")
|
|
unit = p.get("temperatureUnit") # e.g., 'F'
|
|
if start is None or temp is None:
|
|
continue
|
|
# Convert to configured timezone day
|
|
day_key = iso_to_local_date(start, tz_name)
|
|
# Normalize temperature to requested display units
|
|
src_units = "imperial" if str(unit).upper() == "F" else "metric"
|
|
temp_norm = normalize_temp(temp, target_units=units, source_units=src_units)
|
|
by_date[day_key].append(temp_norm)
|
|
|
|
result = []
|
|
for day in sorted(by_date.keys())[:days]:
|
|
# Take the max observed temp for that calendar day
|
|
vals = [v for v in by_date[day] if isinstance(v, (int, float))]
|
|
if not vals:
|
|
continue
|
|
result.append({
|
|
"date": day,
|
|
"high": round(max(vals), 1),
|
|
"units": units,
|
|
"provider": "nws"
|
|
})
|
|
# Return both combined raw (meta + forecast)
|
|
return result, {"points": meta, "forecast": fcst}
|
|
|
|
def fetch_metno(provider_conf, lat, lon, days, units, tz_name, user_agent):
|
|
"""
|
|
MET Norway Locationforecast/2.0 compact (GeoJSON-like).
|
|
'properties.timeseries' with hourly 'instant.details.air_temperature' (°C).
|
|
User-Agent is required by policy.
|
|
"""
|
|
url = provider_conf["base_url"]
|
|
headers = {"User-Agent": user_agent}
|
|
params = {"lat": round(lat, 4), "lon": round(lon, 4)}
|
|
r = requests.get(url, headers=headers, params=params, timeout=20)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
|
|
by_date = defaultdict(list)
|
|
for item in (data.get("properties") or {}).get("timeseries", []):
|
|
t = item.get("time")
|
|
details = ((item.get("data") or {}).get("instant") or {}).get("details") or {}
|
|
temp_c = details.get("air_temperature")
|
|
if t is None or temp_c is None:
|
|
continue
|
|
# Convert timestamp to local calendar day for bucketing
|
|
day_key = iso_to_local_date(t, tz_name)
|
|
# Normalize to requested units
|
|
val = normalize_temp(temp_c, target_units=units, source_units="metric")
|
|
by_date[day_key].append(val)
|
|
|
|
result = []
|
|
for day in sorted(by_date.keys())[:days]:
|
|
vals = [v for v in by_date[day] if isinstance(v, (int, float))]
|
|
if not vals:
|
|
continue
|
|
result.append({
|
|
"date": day,
|
|
"high": round(max(vals), 1),
|
|
"units": units,
|
|
"provider": "metno"
|
|
})
|
|
return result, data
|
|
|
|
def fetch_seventimer(provider_conf, lat, lon, days, units, tz_name):
|
|
"""
|
|
7Timer! civillight:
|
|
JSON shape: { product:'civillight', init:'YYYYMMDDHH', dataseries:[ { date:YYYYMMDD, temp2m:{max,min}, ... } ] }
|
|
Values are in metric by default; we request metric explicitly.
|
|
"""
|
|
url = provider_conf["base_url"]
|
|
params = {
|
|
"lon": lon,
|
|
"lat": lat,
|
|
"product": "civillight",
|
|
"output": "json",
|
|
"unit": "metric"
|
|
}
|
|
r = requests.get(url, params=params, timeout=20)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
|
|
result = []
|
|
for d in (data.get("dataseries") or [])[:days]:
|
|
date_raw = d.get("date") # e.g. 20250210
|
|
t2m = d.get("temp2m") or {}
|
|
tmax_c = t2m.get("max")
|
|
if date_raw is None or tmax_c is None:
|
|
continue
|
|
# 7Timer uses YYYYMMDD integer; convert to ISO date string
|
|
s = str(date_raw)
|
|
day = f"{s[0:4]}-{s[4:6]}-{s[6:8]}"
|
|
high = normalize_temp(tmax_c, target_units=units, source_units="metric")
|
|
result.append({
|
|
"date": day,
|
|
"high": round(float(high), 1),
|
|
"units": units,
|
|
"provider": "7timer"
|
|
})
|
|
return result, data
|
|
|
|
# ---------------------- Aggregation ----------------------
|
|
|
|
def aggregate_daily_highs(provider_forecasts, units, max_days=None):
|
|
"""
|
|
Combine daily highs across providers.
|
|
Output per date: providers[], consensus{mean, median, spread, provider_count}, units
|
|
"""
|
|
by_date = defaultdict(list)
|
|
for lst in provider_forecasts:
|
|
for rec in lst:
|
|
if rec and isinstance(rec.get("high"), (int, float)):
|
|
by_date[rec["date"]].append(rec)
|
|
|
|
aggregated = []
|
|
for day in sorted(by_date.keys()):
|
|
entries = by_date[day]
|
|
highs = [e["high"] for e in entries if isinstance(e["high"], (int, float))]
|
|
if not highs:
|
|
continue
|
|
highs_sorted = sorted(highs)
|
|
n = len(highs_sorted)
|
|
mean_val = sum(highs_sorted) / n
|
|
if n % 2 == 1:
|
|
median_val = highs_sorted[n // 2]
|
|
else:
|
|
median_val = (highs_sorted[n // 2 - 1] + highs_sorted[n // 2]) / 2.0
|
|
spread = highs_sorted[-1] - highs_sorted[0]
|
|
|
|
aggregated.append({
|
|
"date": day,
|
|
"providers": entries,
|
|
"consensus": {
|
|
"mean_high": round(mean_val, 1),
|
|
"median_high": round(median_val, 1),
|
|
"provider_count": n,
|
|
"spread": round(spread, 1)
|
|
},
|
|
"units": units
|
|
})
|
|
|
|
if max_days is not None:
|
|
aggregated = aggregated[:max_days]
|
|
return aggregated
|
|
|
|
# ---------------------- Main ----------------------
|
|
|
|
def main():
|
|
conf = load_config()
|
|
|
|
out_dir = conf["output_dir"]
|
|
os.makedirs(out_dir, exist_ok=True)
|
|
raw_dir = ensure_dirs(out_dir)
|
|
|
|
days = int(conf.get("days", 5))
|
|
units = conf.get("units", "imperial") # 'imperial' or 'metric'
|
|
lat = float(conf["location"]["lat"])
|
|
lon = float(conf["location"]["lon"])
|
|
loc_name = conf["location"]["name"]
|
|
tz_name = conf["location"].get("timezone") or "UTC"
|
|
user_agent = conf.get("user_agent", "WeatherConsensus/1.0 (contact: example@example.com)")
|
|
|
|
date_tag = date_str_utc()
|
|
provider_results = []
|
|
raw_bundle = {}
|
|
|
|
# Open-Meteo
|
|
try:
|
|
if conf["providers"]["open_meteo"]["enabled"]:
|
|
om_list, om_raw = fetch_open_meteo(conf["providers"]["open_meteo"], lat, lon, days, units, tz_name)
|
|
provider_results.append(om_list)
|
|
raw_bundle["open_meteo"] = om_raw
|
|
with open(os.path.join(raw_dir, f"open-meteo_{date_tag}_{loc_name}.json"), "w") as f:
|
|
json.dump(om_raw, f, indent=2)
|
|
except Exception as e:
|
|
print("Open-Meteo error:", repr(e))
|
|
|
|
# NWS (api.weather.gov)
|
|
try:
|
|
if conf["providers"]["nws"]["enabled"]:
|
|
nws_list, nws_raw = fetch_nws(conf["providers"]["nws"], lat, lon, days, units, tz_name, user_agent)
|
|
provider_results.append(nws_list)
|
|
raw_bundle["nws"] = nws_raw
|
|
with open(os.path.join(raw_dir, f"nws_{date_tag}_{loc_name}.json"), "w") as f:
|
|
json.dump(nws_raw, f, indent=2)
|
|
except Exception as e:
|
|
print("NWS error:", repr(e))
|
|
|
|
# MET Norway
|
|
try:
|
|
if conf["providers"]["metno"]["enabled"]:
|
|
met_list, met_raw = fetch_metno(conf["providers"]["metno"], lat, lon, days, units, tz_name, user_agent)
|
|
provider_results.append(met_list)
|
|
raw_bundle["metno"] = met_raw
|
|
with open(os.path.join(raw_dir, f"metno_{date_tag}_{loc_name}.json"), "w") as f:
|
|
json.dump(met_raw, f, indent=2)
|
|
except Exception as e:
|
|
print("MET Norway error:", repr(e))
|
|
|
|
# 7Timer!
|
|
try:
|
|
if conf["providers"]["seventimer"]["enabled"]:
|
|
st_list, st_raw = fetch_seventimer(conf["providers"]["seventimer"], lat, lon, days, units, tz_name)
|
|
provider_results.append(st_list)
|
|
raw_bundle["7timer"] = st_raw
|
|
with open(os.path.join(raw_dir, f"7timer_{date_tag}_{loc_name}.json"), "w") as f:
|
|
json.dump(st_raw, f, indent=2)
|
|
except Exception as e:
|
|
print("7Timer error:", repr(e))
|
|
|
|
# Aggregate across providers
|
|
aggregated = aggregate_daily_highs(provider_forecasts=provider_results, units=units, max_days=days)
|
|
|
|
# Write aggregated & latest
|
|
agg_path = os.path.join(out_dir, f"aggregated_{date_tag}_{loc_name}.json")
|
|
with open(agg_path, "w") as f:
|
|
json.dump({
|
|
"location": loc_name,
|
|
"generated_at_utc": dt.now(timezone.utc).isoformat().replace("+00:00", "Z"),
|
|
"units": units,
|
|
"days": aggregated
|
|
}, f, indent=2)
|
|
|
|
# Update latest.json
|
|
latest_path = os.path.join(out_dir, "latest.json")
|
|
try:
|
|
with open(agg_path, "r") as src, open(latest_path, "w") as dst:
|
|
dst.write(src.read())
|
|
except Exception as e:
|
|
print("Failed to update latest.json:", repr(e))
|
|
|
|
if __name__ == "__main__":
|
|
main()
|