#!/usr/bin/env python3 """ Fetches weather from *no-key* providers, normalizes units, aggregates to a daily consensus, and writes JSON outputs into /var/www/WEATHER/public_html/data/. Providers: - Open-Meteo (no key) -> daily temperature_2m_max (C/F) [https://open-meteo.com] - api.weather.gov (no key; User-Agent required) -> periods [https://api.weather.gov] - MET Norway Locationforecast (no key; User-Agent) -> hourly [https://api.met.no] - 7Timer! civillight (no key) -> daily max/min [http://www.7timer.info] Outputs: data/raw/__.json (raw) data/aggregated__.json (aggregated daily consensus) data/latest.json (copy of aggregated) """ import os import json from datetime import datetime as dt from datetime import timezone from collections import defaultdict from zoneinfo import ZoneInfo # stdlib (Python 3.9+) import requests # ---------------------- Utilities ---------------------- def here(*parts): return os.path.join(os.path.dirname(os.path.abspath(__file__)), *parts) def load_config(): conf_path = here("config.json") if not os.path.exists(conf_path): raise FileNotFoundError( f"Missing config.json at {conf_path}. Copy config.example.json and fill in your values." ) with open(conf_path, "r") as f: return json.load(f) def ensure_dirs(output_dir): raw_dir = os.path.join(output_dir, "raw") os.makedirs(raw_dir, exist_ok=True) return raw_dir def date_str_utc(): return dt.now(timezone.utc).strftime("%Y-%m-%d") def c_to_f(c): return (float(c) * 9.0/5.0) + 32.0 def f_to_c(f): return (float(f) - 32.0) * 5.0/9.0 def normalize_temp(value, target_units, source_units): """Return temp in target_units ('imperial' or 'metric') given source_units.""" if value is None: return None if target_units == source_units: return float(value) if source_units == "metric" and target_units == "imperial": return c_to_f(value) if source_units == "imperial" and target_units == "metric": return f_to_c(value) # default passthrough return float(value) def iso_to_local_date(iso_ts, tz_name): """ Convert ISO-8601 string to local YYYY-MM-DD using zoneinfo tz. Accepts Z or +00:00. Returns date string. """ if iso_ts.endswith("Z"): iso_ts = iso_ts.replace("Z", "+00:00") dt_utc = dt.fromisoformat(iso_ts) if dt_utc.tzinfo is None: dt_utc = dt_utc.replace(tzinfo=timezone.utc) local = dt_utc.astimezone(ZoneInfo(tz_name)) return local.strftime("%Y-%m-%d") # ---------------------- Providers ---------------------- def fetch_open_meteo(provider_conf, lat, lon, days, units, tz_name): """ Open-Meteo: request daily max temperature. Supports temperature_unit and timezone parameters. No API key required. """ url = provider_conf["base_url"] params = { "latitude": lat, "longitude": lon, "daily": "temperature_2m_max", "timezone": tz_name or "auto", "temperature_unit": "fahrenheit" if units == "imperial" else "celsius" } r = requests.get(url, params=params, timeout=20) r.raise_for_status() data = r.json() result = [] daily = (data.get("daily") or {}) times = daily.get("time") or [] highs = daily.get("temperature_2m_max") or [] for t, h in list(zip(times, highs))[:days]: if h is None: continue result.append({ "date": t, # already aligned to requested timezone "high": round(float(h), 1), "units": units, "provider": "open-meteo" }) return result, data def fetch_nws(provider_conf, lat, lon, days, units, tz_name, user_agent): """ NWS api.weather.gov: 1) /points/{lat},{lon} -> get 'forecast' URL 2) GET forecast -> 'properties.periods' (local time). Temperatures usually in Fahrenheit. Requires a User-Agent header. """ base = provider_conf["base_url"].rstrip("/") headers = {"User-Agent": user_agent} # Metadata to find forecast URL meta_url = f"{base}/points/{round(lat,4)},{round(lon,4)}" r_meta = requests.get(meta_url, headers=headers, timeout=20) r_meta.raise_for_status() meta = r_meta.json() forecast_url = (meta.get("properties") or {}).get("forecast") if not forecast_url: return [], {"points": meta} r_fcst = requests.get(forecast_url, headers=headers, timeout=20) r_fcst.raise_for_status() fcst = r_fcst.json() by_date = defaultdict(list) for p in (fcst.get("properties") or {}).get("periods", []): start = p.get("startTime") temp = p.get("temperature") unit = p.get("temperatureUnit") # e.g., 'F' if start is None or temp is None: continue # Convert to configured timezone day day_key = iso_to_local_date(start, tz_name) # Normalize temperature to requested display units src_units = "imperial" if str(unit).upper() == "F" else "metric" temp_norm = normalize_temp(temp, target_units=units, source_units=src_units) by_date[day_key].append(temp_norm) result = [] for day in sorted(by_date.keys())[:days]: # Take the max observed temp for that calendar day vals = [v for v in by_date[day] if isinstance(v, (int, float))] if not vals: continue result.append({ "date": day, "high": round(max(vals), 1), "units": units, "provider": "nws" }) # Return both combined raw (meta + forecast) return result, {"points": meta, "forecast": fcst} def fetch_metno(provider_conf, lat, lon, days, units, tz_name, user_agent): """ MET Norway Locationforecast/2.0 compact (GeoJSON-like). 'properties.timeseries' with hourly 'instant.details.air_temperature' (°C). User-Agent is required by policy. """ url = provider_conf["base_url"] headers = {"User-Agent": user_agent} params = {"lat": round(lat, 4), "lon": round(lon, 4)} r = requests.get(url, headers=headers, params=params, timeout=20) r.raise_for_status() data = r.json() by_date = defaultdict(list) for item in (data.get("properties") or {}).get("timeseries", []): t = item.get("time") details = ((item.get("data") or {}).get("instant") or {}).get("details") or {} temp_c = details.get("air_temperature") if t is None or temp_c is None: continue # Convert timestamp to local calendar day for bucketing day_key = iso_to_local_date(t, tz_name) # Normalize to requested units val = normalize_temp(temp_c, target_units=units, source_units="metric") by_date[day_key].append(val) result = [] for day in sorted(by_date.keys())[:days]: vals = [v for v in by_date[day] if isinstance(v, (int, float))] if not vals: continue result.append({ "date": day, "high": round(max(vals), 1), "units": units, "provider": "metno" }) return result, data def fetch_seventimer(provider_conf, lat, lon, days, units, tz_name): """ 7Timer! civillight: JSON shape: { product:'civillight', init:'YYYYMMDDHH', dataseries:[ { date:YYYYMMDD, temp2m:{max,min}, ... } ] } Values are in metric by default; we request metric explicitly. """ url = provider_conf["base_url"] params = { "lon": lon, "lat": lat, "product": "civillight", "output": "json", "unit": "metric" } r = requests.get(url, params=params, timeout=20) r.raise_for_status() data = r.json() result = [] for d in (data.get("dataseries") or [])[:days]: date_raw = d.get("date") # e.g. 20250210 t2m = d.get("temp2m") or {} tmax_c = t2m.get("max") if date_raw is None or tmax_c is None: continue # 7Timer uses YYYYMMDD integer; convert to ISO date string s = str(date_raw) day = f"{s[0:4]}-{s[4:6]}-{s[6:8]}" high = normalize_temp(tmax_c, target_units=units, source_units="metric") result.append({ "date": day, "high": round(float(high), 1), "units": units, "provider": "7timer" }) return result, data # ---------------------- Aggregation ---------------------- def aggregate_daily_highs(provider_forecasts, units, max_days=None): """ Combine daily highs across providers. Output per date: providers[], consensus{mean, median, spread, provider_count}, units """ by_date = defaultdict(list) for lst in provider_forecasts: for rec in lst: if rec and isinstance(rec.get("high"), (int, float)): by_date[rec["date"]].append(rec) aggregated = [] for day in sorted(by_date.keys()): entries = by_date[day] highs = [e["high"] for e in entries if isinstance(e["high"], (int, float))] if not highs: continue highs_sorted = sorted(highs) n = len(highs_sorted) mean_val = sum(highs_sorted) / n if n % 2 == 1: median_val = highs_sorted[n // 2] else: median_val = (highs_sorted[n // 2 - 1] + highs_sorted[n // 2]) / 2.0 spread = highs_sorted[-1] - highs_sorted[0] aggregated.append({ "date": day, "providers": entries, "consensus": { "mean_high": round(mean_val, 1), "median_high": round(median_val, 1), "provider_count": n, "spread": round(spread, 1) }, "units": units }) if max_days is not None: aggregated = aggregated[:max_days] return aggregated # ---------------------- Main ---------------------- def main(): conf = load_config() out_dir = conf["output_dir"] os.makedirs(out_dir, exist_ok=True) raw_dir = ensure_dirs(out_dir) days = int(conf.get("days", 5)) units = conf.get("units", "imperial") # 'imperial' or 'metric' lat = float(conf["location"]["lat"]) lon = float(conf["location"]["lon"]) loc_name = conf["location"]["name"] tz_name = conf["location"].get("timezone") or "UTC" user_agent = conf.get("user_agent", "WeatherConsensus/1.0 (contact: example@example.com)") date_tag = date_str_utc() provider_results = [] raw_bundle = {} # Open-Meteo try: if conf["providers"]["open_meteo"]["enabled"]: om_list, om_raw = fetch_open_meteo(conf["providers"]["open_meteo"], lat, lon, days, units, tz_name) provider_results.append(om_list) raw_bundle["open_meteo"] = om_raw with open(os.path.join(raw_dir, f"open-meteo_{date_tag}_{loc_name}.json"), "w") as f: json.dump(om_raw, f, indent=2) except Exception as e: print("Open-Meteo error:", repr(e)) # NWS (api.weather.gov) try: if conf["providers"]["nws"]["enabled"]: nws_list, nws_raw = fetch_nws(conf["providers"]["nws"], lat, lon, days, units, tz_name, user_agent) provider_results.append(nws_list) raw_bundle["nws"] = nws_raw with open(os.path.join(raw_dir, f"nws_{date_tag}_{loc_name}.json"), "w") as f: json.dump(nws_raw, f, indent=2) except Exception as e: print("NWS error:", repr(e)) # MET Norway try: if conf["providers"]["metno"]["enabled"]: met_list, met_raw = fetch_metno(conf["providers"]["metno"], lat, lon, days, units, tz_name, user_agent) provider_results.append(met_list) raw_bundle["metno"] = met_raw with open(os.path.join(raw_dir, f"metno_{date_tag}_{loc_name}.json"), "w") as f: json.dump(met_raw, f, indent=2) except Exception as e: print("MET Norway error:", repr(e)) # 7Timer! try: if conf["providers"]["seventimer"]["enabled"]: st_list, st_raw = fetch_seventimer(conf["providers"]["seventimer"], lat, lon, days, units, tz_name) provider_results.append(st_list) raw_bundle["7timer"] = st_raw with open(os.path.join(raw_dir, f"7timer_{date_tag}_{loc_name}.json"), "w") as f: json.dump(st_raw, f, indent=2) except Exception as e: print("7Timer error:", repr(e)) # Aggregate across providers aggregated = aggregate_daily_highs(provider_forecasts=provider_results, units=units, max_days=days) # Write aggregated & latest agg_path = os.path.join(out_dir, f"aggregated_{date_tag}_{loc_name}.json") with open(agg_path, "w") as f: json.dump({ "location": loc_name, "generated_at_utc": dt.now(timezone.utc).isoformat().replace("+00:00", "Z"), "units": units, "days": aggregated }, f, indent=2) # Update latest.json latest_path = os.path.join(out_dir, "latest.json") try: with open(agg_path, "r") as src, open(latest_path, "w") as dst: dst.write(src.read()) except Exception as e: print("Failed to update latest.json:", repr(e)) if __name__ == "__main__": main()