Files
2026-06-25 23:17:45 +00:00

834 lines
33 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
CGI endpoint for on-demand, cached consensus forecasts.
Usage:
GET /api/forecast.py?q=<place>&debug=1 # e.g., "Dallas, TX", "76084"
GET /api/forecast.py?lat=..&lon=.. # explicit coordinates
Geocoding order:
1) OpenMeteo Geocoding (names & postal codes, no key)
2) Nominatim fallback (no key) requires custom UA; ≤1 rps
Weather providers (no key):
- OpenMeteo (daily temp max/min, precip prob max, wind 10 m max; hourly humidity/precip/wind fallbacks)
- api.weather.gov (NWS; period PoP, wind, humidity → daily rollup; requires UserAgent)
- MET Norway Locationforecast (hourly humidity/wind/temp → daily rollup; requires UserAgent)
- 7Timer! civillight (daily temp max/min)
Cache:
24h perlocation under /var/www/WEATHER/public_html/data/locations/<slug>/latest.json
"""
import os
import sys
import json
import re
import time
from datetime import datetime as dt, timezone
from urllib.parse import parse_qs
from collections import defaultdict
from zoneinfo import ZoneInfo
import requests
# -------------------- Paths & Config --------------------
PUBLIC_ROOT = "/var/www/WEATHER/public_html"
DATA_ROOT = os.path.join(PUBLIC_ROOT, "data")
FETCHER_DIR = os.path.join(PUBLIC_ROOT, "fetcher") # config.json lives here
CONFIG_PATH = os.path.join(FETCHER_DIR, "config.json")
CACHE_TTL_SECONDS = 24 * 3600 # 24 hours
NOMINATIM_RATELIMIT_MARK = "/tmp/nominatim_last_call" # crude 1 rps limiter across CGI invocations
# Bump when payload shape changes
# v5 adds: payload["timezone"], payload["all_days"]; cached responses are re-sliced to "today"
SCHEMA_VERSION = 5
def load_config():
with open(CONFIG_PATH, "r") as f:
return json.load(f)
# -------------------- CGI helpers --------------------
def http_json(payload, status=200, cors="*"):
"""Emit CGI headers + JSON body (always)."""
reason = "OK" if status == 200 else "ERROR"
print(f"Status: {status} {reason}")
print("Content-Type: application/json; charset=utf-8")
if cors:
print(f"Access-Control-Allow-Origin: {cors}")
print()
sys.stdout.write(json.dumps(payload, indent=2))
def http_error(message, status=400, debug=None):
body = {"error": message}
if debug is not None:
body["debug"] = debug
http_json(body, status=status)
# -------------------- General helpers --------------------
def parse_query():
qs = os.environ.get("QUERY_STRING", "")
params = parse_qs(qs, keep_blank_values=False)
q = (params.get("q", [None])[0] or "").strip()
debug = (params.get("debug", ["0"])[0] == "1")
lat = params.get("lat", [None])[0]
lon = params.get("lon", [None])[0]
latf = lonf = None
if lat is not None and lon is not None:
try:
latf = float(lat)
lonf = float(lon)
except Exception:
latf = lonf = None
return q, latf, lonf, debug
def slugify(s: str) -> str:
safe = "".join(ch.lower() if ch.isalnum() else "-" for ch in s.strip())
while "--" in safe:
safe = safe.replace("--", "-")
return safe.strip("-")
def ensure_dirs(path):
os.makedirs(path, exist_ok=True)
def file_is_fresh(path, ttl_seconds):
if not os.path.exists(path):
return False
age = time.time() - os.path.getmtime(path)
return 0 <= age < ttl_seconds
def c_to_f(c):
return (float(c) * 9.0 / 5.0) + 32.0
def f_to_c(f):
return (float(f) - 32.0) * 5.0 / 9.0
def mph_to_kmh(m):
return float(m) * 1.609344
def mps_to_mph(s):
return float(s) * 2.23693629
def mps_to_kmh(s):
return float(s) * 3.6
def normalize_temp(value, target_units, source_units):
if value is None:
return None
if target_units == source_units:
return float(value)
if source_units == "metric" and target_units == "imperial":
return c_to_f(value)
if source_units == "imperial" and target_units == "metric":
return f_to_c(value)
return float(value)
def normalize_wind(value, target_units, source_origin):
"""Return wind speed in target_units: imperial=>mph, metric=>km/h."""
if value is None:
return None
v = float(value)
if target_units == "imperial":
if source_origin == "mph":
return v
if source_origin == "kmh":
return v / 1.609344
if source_origin == "mps":
return mps_to_mph(v)
else: # metric
if source_origin == "mph":
return mph_to_kmh(v)
if source_origin == "kmh":
return v
if source_origin == "mps":
return mps_to_kmh(v)
return v
def iso_to_local_date(iso_ts, tz_name):
"""ISO string (maybe Z) -> local YYYY-MM-DD in tz_name."""
if iso_ts.endswith("Z"):
iso_ts = iso_ts.replace("Z", "+00:00")
d = dt.fromisoformat(iso_ts)
if d.tzinfo is None:
d = d.replace(tzinfo=timezone.utc)
return d.astimezone(ZoneInfo(tz_name)).strftime("%Y-%m-%d")
# -------------------- Geocoding: overrides + OpenMeteo + Nominatim --------------------
US_STATES = {
"AL": "Alabama", "AK": "Alaska", "AZ": "Arizona", "AR": "Arkansas", "CA": "California",
"CO": "Colorado", "CT": "Connecticut", "DE": "Delaware", "FL": "Florida", "GA": "Georgia",
"HI": "Hawaii", "ID": "Idaho", "IL": "Illinois", "IN": "Indiana", "IA": "Iowa", "KS": "Kansas",
"KY": "Kentucky", "LA": "Louisiana", "ME": "Maine", "MD": "Maryland", "MA": "Massachusetts",
"MI": "Michigan", "MN": "Minnesota", "MS": "Mississippi", "MO": "Missouri", "MT": "Montana",
"NE": "Nebraska", "NV": "Nevada", "NH": "New Hampshire", "NJ": "New Jersey", "NM": "New Mexico",
"NY": "New York", "NC": "North Carolina", "ND": "North Dakota", "OH": "Ohio", "OK": "Oklahoma",
"OR": "Oregon", "PA": "Pennsylvania", "RI": "Rhode Island", "SC": "South Carolina",
"SD": "South Dakota", "TN": "Tennessee", "TX": "Texas", "UT": "Utah", "VT": "Vermont",
"VA": "Virginia", "WA": "Washington", "WV": "West Virginia", "WI": "Wisconsin", "WY": "Wyoming",
"DC": "District of Columbia"
}
_zip_re = re.compile(r"^\s*\d{5}(?:-\d{4})?\s*$")
def norm_key(s: str) -> str:
t = s.lower().strip()
t = re.sub(r"[^\w,\s\-]", " ", t)
t = re.sub(r"\s+", " ", t)
return t
def lookup_override(q: str, overrides: dict):
"""Return dict(name,lat,lon) if q matches an override key or any alias."""
if not overrides:
return None
k = norm_key(q)
if k in overrides:
o = overrides[k]
if "lat" in o and "lon" in o:
name = o.get("name") or q
return {"source": "override", "name": name, "lat": float(o["lat"]), "lon": float(o["lon"]), "admin1": None, "country": None}
for ok, ov in overrides.items():
aliases = ov.get("aliases") or []
for a in aliases:
if norm_key(a) == k and "lat" in ov and "lon" in ov:
name = ov.get("name") or q
return {"source": "override", "name": name, "lat": float(ov["lat"]), "lon": float(ov["lon"]), "admin1": None, "country": None}
return None
def build_om_attempts(query: str):
"""Return list of (variant_string, countryCode or None) to try with OpenMeteo."""
q = (query or "").strip()
attempts = []
if not q:
return attempts
is_zip = bool(_zip_re.match(q))
tokens = re.split(r"[, ]+", q)
last = tokens[-1].upper() if tokens else ""
state_full = US_STATES.get(last)
if is_zip:
attempts.append((q, "US"))
else:
attempts.append((q, None))
if state_full:
city = " ".join(tokens[:-1]).strip(", ")
if city:
attempts.append((f"{city}, {state_full}", "US"))
attempts.append((f"{city}, {last}, US", "US"))
stripped = re.sub(r"[^a-zA-Z0-9 ]+", " ", q)
stripped = re.sub(r"\s+", " ", stripped).strip()
if stripped and stripped != q:
attempts.append((stripped, None))
if any(k in q.lower() for k in [" texas", ", tx", " usa"]):
attempts.append((q + ", United States", "US"))
seen = set(); deduped = []
for name, cc in attempts:
key = (name.lower(), (cc or "").lower())
if key not in seen:
seen.add(key); deduped.append((name, cc))
return deduped
def geocode_open_meteo_first(query, user_agent, language="en", attempts_out=None):
base_url = "https://geocoding-api.open-meteo.com/v1/search"
headers = {"User-Agent": user_agent}
if attempts_out is None:
attempts_out = []
for name, cc in build_om_attempts(query):
attempts_out.append({"geocoder": "open-meteo", "name": name, "countryCode": cc})
params = {"name": name, "count": 1, "language": language, "format": "json"}
if cc:
params["countryCode"] = cc
try:
r = requests.get(base_url, params=params, headers=headers, timeout=15)
r.raise_for_status()
j = r.json()
results = j.get("results") or []
if results:
r0 = results[0]
return {
"source": "open-meteo",
"name": r0.get("name"),
"admin1": r0.get("admin1"),
"country": r0.get("country"),
"lat": float(r0["latitude"]),
"lon": float(r0["longitude"])
}
except Exception as e:
print(f"Open-Meteo geocode error for {name} cc={cc}: {e}", file=sys.stderr)
return None
def _nominatim_rate_limit_once_per_second():
try:
now = time.time()
if os.path.exists(NOMINATIM_RATELIMIT_MARK):
last = os.path.getmtime(NOMINATIM_RATELIMIT_MARK)
wait = 1.0 - (now - last)
if wait > 0:
time.sleep(wait)
with open(NOMINATIM_RATELIMIT_MARK, "w") as f:
f.write(str(now))
except Exception:
pass
def geocode_nominatim_fallback(query, user_agent, email=None, country_hint="us", language="en", attempts_out=None):
if attempts_out is None:
attempts_out = []
q = (query or "").strip()
if not q:
return None
base = "https://nominatim.openstreetmap.org/search"
headers = {"User-Agent": user_agent}
def call_nominatim(params):
_nominatim_rate_limit_once_per_second()
attempts_out.append({"geocoder": "nominatim", **{k: v for k, v in params.items() if k in ("q", "city", "state", "country", "countrycodes")}})
r = requests.get(base, params=params, headers=headers, timeout=15)
r.raise_for_status()
arr = r.json()
if isinstance(arr, list) and arr:
top = arr[0]
lat = float(top["lat"]); lon = float(top["lon"])
addr = top.get("address") or {}
name = addr.get("city") or addr.get("town") or addr.get("village") or addr.get("municipality") or top.get("display_name", q)
admin1 = addr.get("state"); country = addr.get("country")
display = ", ".join([x for x in [name, admin1, country] if x])
return {"source": "nominatim", "name": display or q, "admin1": admin1, "country": country, "lat": lat, "lon": lon}
return None
# A) Freeform with country hint
A = {"q": q, "format": "json", "addressdetails": 1, "limit": 1, "accept-language": language}
if country_hint:
A["countrycodes"] = country_hint.lower()
if email:
A["email"] = email
try:
res = call_nominatim(A)
if res:
return res
except Exception as e:
print(f"Nominatim free-form (hint) error: {e}", file=sys.stderr)
# B) Structured City/State[/Country]
m = re.match(r"^\s*([A-Za-z .'\-]+)[,\s]+\s*([A-Za-z]{2,})\s*$", q)
if m:
city_raw, state_token = m.group(1), m.group(2)
state_full = US_STATES.get(state_token.upper(), state_token)
B = {"city": city_raw.strip(), "state": state_full, "country": "US",
"format": "json", "addressdetails": 1, "limit": 1, "accept-language": language}
if email:
B["email"] = email
try:
res = call_nominatim(B)
if res:
return res
except Exception as e:
print(f"Nominatim structured error: {e}", file=sys.stderr)
# C) Freeform without hint
C = {"q": q, "format": "json", "addressdetails": 1, "limit": 1, "accept-language": language}
if email:
C["email"] = email
try:
res = call_nominatim(C)
if res:
return res
except Exception as e:
print(f"Nominatim free-form (no hint) error: {e}", file=sys.stderr)
return None
# -------------------- Providers (OpenMeteo / NWS / MET / 7Timer) --------------------
def fetch_open_meteo(conf, lat, lon, days, units, tz_name, user_agent):
"""Return per-day dicts: high, low, precip_chance, wind_max, humidity_avg (with hourly fallbacks)."""
url = conf["base_url"]
params = {
"latitude": lat, "longitude": lon,
"daily": "temperature_2m_max,temperature_2m_min,precipitation_probability_max,wind_speed_10m_max",
"hourly": "relative_humidity_2m,precipitation_probability,wind_speed_10m",
"temperature_unit": "fahrenheit" if units == "imperial" else "celsius",
"windspeed_unit": "mph" if units == "imperial" else "kmh",
"timezone": tz_name or "auto"
}
r = requests.get(url, params=params, headers={"User-Agent": user_agent}, timeout=20)
r.raise_for_status()
data = r.json()
out = defaultdict(lambda: {"provider": "open-meteo"})
# ---- daily (direct) ----
daily = data.get("daily") or {}
times_d = daily.get("time") or []
tmax_d = daily.get("temperature_2m_max") or [None] * len(times_d)
tmin_d = daily.get("temperature_2m_min") or [None] * len(times_d)
pmax_d = daily.get("precipitation_probability_max") or [None] * len(times_d)
wmax_d = daily.get("wind_speed_10m_max") or [None] * len(times_d)
for i, dday in enumerate(times_d):
rec = out[dday]
if tmax_d[i] is not None:
rec["high"] = round(float(tmax_d[i]), 1)
if tmin_d[i] is not None:
rec["low"] = round(float(tmin_d[i]), 1)
if pmax_d[i] is not None:
rec["precip_chance"] = int(pmax_d[i])
if wmax_d[i] is not None:
rec["wind_max"] = round(float(wmax_d[i]), 1)
# ---- hourly fallbacks + humidity avg ----
hourly = data.get("hourly") or {}
times_h = hourly.get("time") or []
rh_h = hourly.get("relative_humidity_2m") or []
pop_h = hourly.get("precipitation_probability") or []
wind_h = hourly.get("wind_speed_10m") or []
for t, h in zip(times_h, rh_h):
if h is None:
continue
out[t[:10]].setdefault("_hum_list", []).append(float(h))
for t, p in zip(times_h, pop_h):
if p is None:
continue
dday = t[:10]
cur = out[dday].get("_pop_max")
out[dday]["_pop_max"] = max(cur, int(p)) if cur is not None else int(p)
for t, w in zip(times_h, wind_h):
if w is None:
continue
dday = t[:10]
cur = out[dday].get("_wind_max")
out[dday]["_wind_max"] = max(cur, float(w)) if cur is not None else float(w)
result = []
for dday in sorted(out.keys())[:days]:
rec = out[dday]
if rec.get("_hum_list"):
rec["humidity_avg"] = int(round(sum(rec["_hum_list"]) / len(rec["_hum_list"])))
del rec["_hum_list"]
if "precip_chance" not in rec and rec.get("_pop_max") is not None:
rec["precip_chance"] = int(rec["_pop_max"])
if "_pop_max" in rec:
del rec["_pop_max"]
if "wind_max" not in rec and rec.get("_wind_max") is not None:
rec["wind_max"] = round(float(rec["_wind_max"]), 1)
if "_wind_max" in rec:
del rec["_wind_max"]
rec["date"] = dday
rec["units"] = units
result.append(rec)
return result, data
def _parse_nws_wind_mph(s):
"""Parse 'Calm', '5 mph', '10 to 20 mph' -> number mph (max)."""
if not s or not isinstance(s, str):
return None
s_lower = s.lower()
if "calm" in s_lower:
return 0.0
nums = [float(x) for x in re.findall(r"(\d+(?:\.\d+)?)", s)]
return max(nums) if nums else None
def fetch_nws(conf, lat, lon, days, units, tz_name, user_agent):
base = conf["base_url"].rstrip("/")
headers = {"User-Agent": user_agent}
meta_url = f"{base}/points/{round(lat,4)},{round(lon,4)}"
r_meta = requests.get(meta_url, headers=headers, timeout=20)
r_meta.raise_for_status()
meta = r_meta.json()
forecast_url = (meta.get("properties") or {}).get("forecast")
if not forecast_url:
return [], {"points": meta}
r_fcst = requests.get(forecast_url, headers=headers, timeout=20)
r_fcst.raise_for_status()
fcst = r_fcst.json()
day_buckets = defaultdict(lambda: {"provider": "nws"})
for p in (fcst.get("properties") or {}).get("periods", []):
start = p.get("startTime")
if not start:
continue
day = iso_to_local_date(start, tz_name)
rec = day_buckets[day]
# Temperature (F) -> track both min and max
t = p.get("temperature")
if t is not None:
val = float(normalize_temp(t, units, "imperial"))
rec["high"] = max(rec.get("high", -1e9), val)
rec["low"] = min(rec.get("low", 1e9), val)
# Precip chance (%)
pop = ((p.get("probabilityOfPrecipitation") or {}).get("value"))
if pop is not None:
rec["precip_chance"] = max(int(pop), rec.get("precip_chance", -1))
# Wind ("10 mph" / "10 to 20 mph")
w = _parse_nws_wind_mph(p.get("windSpeed"))
if w is not None:
w_norm = normalize_wind(w, units, "mph")
rec["wind_max"] = max(rec.get("wind_max", -1e9), w_norm)
# Humidity (%)
rh = ((p.get("relativeHumidity") or {}).get("value"))
if rh is not None:
rec.setdefault("_hum_list", []).append(float(rh))
result = []
for day in sorted(day_buckets.keys())[:days]:
rec = day_buckets[day]
if rec.get("_hum_list"):
rec["humidity_avg"] = int(round(sum(rec["_hum_list"]) / len(rec["_hum_list"])))
del rec["_hum_list"]
if "wind_max" in rec:
rec["wind_max"] = round(rec["wind_max"], 1)
if "high" in rec:
rec["high"] = round(rec["high"], 1)
if "low" in rec and rec["low"] != 1e9:
rec["low"] = round(rec["low"], 1)
rec["date"] = day
rec["units"] = units
result.append(rec)
return result, {"points": meta, "forecast": fcst}
def fetch_metno(conf, lat, lon, days, units, tz_name, user_agent):
url = conf["base_url"]
headers = {"User-Agent": user_agent}
params = {"lat": round(lat, 4), "lon": round(lon, 4)}
r = requests.get(url, params=params, headers=headers, timeout=20)
r.raise_for_status()
data = r.json()
day_buckets = defaultdict(lambda: {"provider": "metno"})
for item in (data.get("properties") or {}).get("timeseries", []):
t = item.get("time")
if not t:
continue
day = iso_to_local_date(t, tz_name)
details = ((item.get("data") or {}).get("instant") or {}).get("details") or {}
rh = details.get("relative_humidity") # %
ws = details.get("wind_speed") # m/s
ta = details.get("air_temperature") # °C
if rh is not None:
day_buckets[day].setdefault("_hum_list", []).append(float(rh))
if ws is not None:
v = normalize_wind(ws, units, "mps")
day_buckets[day]["wind_max"] = max(day_buckets[day].get("wind_max", -1e9), v)
if ta is not None:
val = float(ta) if units == "metric" else float(c_to_f(ta))
day_buckets[day].setdefault("_t_list", []).append(val)
result = []
for day in sorted(day_buckets.keys())[:days]:
rec = day_buckets[day]
if rec.get("_hum_list"):
rec["humidity_avg"] = int(round(sum(rec["_hum_list"]) / len(rec["_hum_list"])))
del rec["_hum_list"]
if rec.get("_t_list"):
rec["low"] = round(min(rec["_t_list"]), 1)
del rec["_t_list"]
if "wind_max" in rec:
rec["wind_max"] = round(rec["wind_max"], 1)
rec["date"] = day
rec["units"] = units
result.append(rec)
return result, data
def fetch_seventimer(conf, lat, lon, days, units, tz_name, user_agent):
url = conf["base_url"]
params = {"lon": lon, "lat": lat, "product": "civillight", "output": "json", "unit": "metric"}
r = requests.get(url, params=params, headers={"User-Agent": user_agent}, timeout=20)
r.raise_for_status()
data = r.json()
result = []
for d in (data.get("dataseries") or [])[:days]:
date_raw = d.get("date")
t2m = d.get("temp2m") or {}
tmax_c = t2m.get("max")
tmin_c = t2m.get("min")
if date_raw is None:
continue
s = str(date_raw)
day = f"{s[0:4]}-{s[4:6]}-{s[6:8]}"
rec = {"date": day, "units": units, "provider": "7timer"}
if tmax_c is not None:
rec["high"] = round(float(normalize_temp(tmax_c, target_units=units, source_units="metric")), 1)
if tmin_c is not None:
rec["low"] = round(float(normalize_temp(tmin_c, target_units=units, source_units="metric")), 1)
result.append(rec)
return result, data
# -------------------- Aggregation --------------------
def aggregate_days(provider_lists, units, max_days=None):
by_date = defaultdict(list)
for lst in provider_lists:
for rec in lst:
if rec:
by_date[rec["date"]].append(rec)
def _finite(xs):
return [x for x in xs if isinstance(x, (int, float)) and x == x and abs(x) != float("inf")]
# Unit-aware temperature guardrails
if units == "metric":
TEMP_MIN, TEMP_MAX = -73.3, 60.0 # ~ -100..140 F
else:
TEMP_MIN, TEMP_MAX = -100.0, 140.0
def _clamp_temp(xs):
xs2 = []
for x in _finite(xs):
if TEMP_MIN <= x <= TEMP_MAX:
xs2.append(x)
return xs2
def _clamp_pct(xs):
return [min(100, max(0, int(round(x)))) for x in _finite(xs)]
def _clamp_wind(xs):
# allow 0..200 mph (or ~0..320 km/h)
xs2 = []
for x in _finite(xs):
if 0 <= x <= 200:
xs2.append(x)
return xs2
aggregated = []
for day in sorted(by_date.keys()):
entries = by_date[day]
highs = [e.get("high") for e in entries]
lows = [e.get("low") for e in entries]
pops = [e.get("precip_chance") for e in entries]
winds = [e.get("wind_max") for e in entries]
hums = [e.get("humidity_avg") for e in entries]
# Sanitize / clamp before computing statistics
highs = _clamp_temp(highs)
lows = _clamp_temp(lows)
pops = _clamp_pct(pops)
winds = _clamp_wind(winds)
hums = _clamp_pct(hums)
def mean_or_none(xs): return round(sum(xs)/len(xs), 1) if xs else None
def int_mean_or_none(xs): return int(round(sum(xs)/len(xs))) if xs else None
def median_or_none(xs):
if not xs: return None
s = sorted(xs); n = len(s)
return round(s[n//2], 1) if n % 2 == 1 else round((s[n//2 - 1] + s[n//2]) / 2.0, 1)
day_obj = {
"date": day,
"providers": entries,
"consensus": {
"mean_high": mean_or_none(highs),
"median_high": median_or_none(highs),
"mean_low": mean_or_none(lows),
"median_low": median_or_none(lows),
"precip_chance_mean": int_mean_or_none(pops),
"wind_max_mean": mean_or_none(winds),
"humidity_avg_mean": int_mean_or_none(hums),
"provider_count": len(entries),
"spread_high": (round(max(highs) - min(highs), 1) if len(highs) >= 2 else None),
"spread_low": (round(max(lows) - min(lows), 1) if len(lows) >= 2 else None),
},
"units": units,
"wind_unit": "mph" if units == "imperial" else "km/h",
"humidity_unit": "%",
"precip_unit": "%"
}
aggregated.append(day_obj)
if max_days is not None:
aggregated = aggregated[:max_days]
return aggregated
# -------------------- Display label de-duplication --------------------
def _dedupe_label(name, admin1, country):
parts_in = [p for p in [name, admin1, country] if p]
out = []
seen = set()
combined = []
for p in parts_in:
if p:
combined.extend([s.strip() for s in str(p).split(",") if s and s.strip()])
for s in combined:
key = s.lower()
if key and key not in seen:
seen.add(key)
out.append(s)
return ", ".join(out)
# -------------------- Slicing helpers for "today" in forecast timezone --------------------
def _parse_ymd_safe(s: str):
"""Robust YYYY-M-D or YYYY-MM-DD parser returning a date() (naively in UTC)."""
s2 = (s or "").strip()
m = re.match(r"^\s*(\d{4})-(\d{1,2})-(\d{1,2})\s*$", s2)
if not m:
return None
y, mo, d = int(m.group(1)), int(m.group(2)), int(m.group(3))
try:
return dt(y, mo, d, tzinfo=timezone.utc).date()
except Exception:
return None
def slice_days_for_timezone(all_days, tz_name, tz_fallback):
"""Given unfiltered all_days, compute today's 7-day window using tz_name."""
try:
today_local_date = dt.now(ZoneInfo(tz_name)).date()
except Exception:
today_local_date = dt.now(ZoneInfo(tz_fallback)).date()
normalized = []
for d in all_days or []:
d_date = _parse_ymd_safe(d.get("date"))
if d_date is None:
continue
nd = dict(d)
nd["__date_obj"] = d_date
normalized.append(nd)
normalized = [d for d in normalized if d["__date_obj"] >= today_local_date]
normalized.sort(key=lambda r: r["__date_obj"])
window = normalized[:7]
for d in window:
d.pop("__date_obj", None)
return window, today_local_date.isoformat()
# -------------------- Main handler --------------------
def run():
conf = load_config()
units = conf.get("units", "imperial") # 'imperial' or 'metric'
days = int(conf.get("days", 7)) # fetch horizon for providers
# Fallback only; actual tz will be detected from OpenMeteo's response (timezone=auto)
tz_fallback = conf["location"].get("timezone", "UTC")
user_agent = conf.get("user_agent") or "WeatherConsensus/1.0 (https://weather.thedarkelite.com, admin@thedarkelite.com)"
nominatim_email = conf.get("nominatim_email") # optional (polite)
overrides = conf.get("geocoding_overrides") or {}
q, lat_in, lon_in, debug_flag = parse_query()
debug_info = {"attempts": []}
# Resolve coordinates
if lat_in is not None and lon_in is not None:
loc = {"name": f"{lat_in:.4f},{lon_in:.4f}", "lat": lat_in, "lon": lon_in}
debug_info["geocode_source"] = "coords"
debug_info["query"] = None
elif q:
debug_info["query"] = q
g = lookup_override(q, overrides)
if g:
debug_info["attempts"].append({"geocoder": "override", "match": norm_key(q)})
else:
g = geocode_open_meteo_first(q, user_agent=user_agent, attempts_out=debug_info["attempts"])
if not g:
hint = "us" if (_zip_re.match(q) or " texas" in q.lower() or re.search(r",\s*[A-Za-z]{2}$", q)) else None
g = geocode_nominatim_fallback(q, user_agent=user_agent, email=nominatim_email, country_hint=hint, attempts_out=debug_info["attempts"])
if not g:
return http_error("Location not found", status=404, debug=debug_info if debug_flag else None)
display_name = _dedupe_label(g.get("name"), g.get("admin1"), g.get("country")) or q
loc = {"name": display_name, "lat": g["lat"], "lon": g["lon"]}
debug_info["geocode_source"] = g.get("source") or "open-meteo/nominatim/override"
else:
return http_error("Missing query. Provide q=city,state or lat & lon.", 400)
# Cache paths
slug = slugify(f"{loc['name']}_{loc['lat']:.4f}_{loc['lon']:.4f}")
loc_dir = os.path.join(DATA_ROOT, "locations", slug)
raw_dir = os.path.join(loc_dir, "raw")
ensure_dirs(raw_dir)
cache_path = os.path.join(loc_dir, "latest.json")
# Try cache; if schema matches, re-slice to "today" using stored timezone + all_days
if file_is_fresh(cache_path, CACHE_TTL_SECONDS):
try:
with open(cache_path, "r") as f:
cached = json.load(f)
if cached.get("schema_version") == SCHEMA_VERSION:
tz_eff = cached.get("timezone") or tz_fallback
all_days = cached.get("all_days") or cached.get("days") or []
sliced, today_iso = slice_days_for_timezone(all_days, tz_eff, tz_fallback)
cached = dict(cached)
cached["days"] = sliced
if debug_flag:
cached["debug"] = {**debug_info, "tz_effective": tz_eff, "today_local_date": today_iso, "from_cache": True}
return http_json(cached, status=200)
except Exception as e:
print(f"Cache read error (ignored): {e}", file=sys.stderr)
# Fetch from providers
provider_lists = []
date_tag = dt.now(timezone.utc).strftime("%Y-%m-%d")
# (A) OpenMeteo FIRST, with timezone=auto to detect the location's tz
tz_effective = tz_fallback
try:
if conf["providers"]["open_meteo"]["enabled"]:
om_conf = dict(conf["providers"]["open_meteo"])
lst, raw = fetch_open_meteo(om_conf, loc["lat"], loc["lon"], days, units, "auto", user_agent)
provider_lists.append(lst)
if isinstance(raw, dict):
tz_effective = raw.get("timezone") or tz_fallback
with open(os.path.join(raw_dir, f"open-meteo_{date_tag}.json"), "w") as f:
json.dump(raw, f, indent=2)
except Exception as e:
print(f"OpenMeteo error: {e}", file=sys.stderr)
# (B) Remaining providers pass tz_effective for correct local-day bucketing
try:
if conf["providers"]["nws"]["enabled"]:
lst, raw = fetch_nws(conf["providers"]["nws"], loc["lat"], loc["lon"], days, units, tz_effective, user_agent)
provider_lists.append(lst)
with open(os.path.join(raw_dir, f"nws_{date_tag}.json"), "w") as f:
json.dump(raw, f, indent=2)
except Exception as e:
print(f"NWS error: {e}", file=sys.stderr)
try:
if conf["providers"]["metno"]["enabled"]:
lst, raw = fetch_metno(conf["providers"]["metno"], loc["lat"], loc["lon"], days, units, tz_effective, user_agent)
provider_lists.append(lst)
with open(os.path.join(raw_dir, f"metno_{date_tag}.json"), "w") as f:
json.dump(raw, f, indent=2)
except Exception as e:
print(f"MET Norway error: {e}", file=sys.stderr)
try:
if conf["providers"]["seventimer"]["enabled"]:
lst, raw = fetch_seventimer(conf["providers"]["seventimer"], loc["lat"], loc["lon"], days, units, tz_effective, user_agent)
provider_lists.append(lst)
with open(os.path.join(raw_dir, f"7timer_{date_tag}.json"), "w") as f:
json.dump(raw, f, indent=2)
except Exception as e:
print(f"7Timer error: {e}", file=sys.stderr)
# Aggregate across providers (unfiltered)
aggregated_full = aggregate_days(provider_lists, units, max_days=None)
# Compute the visible window TODAY..TODAY+6 using the location timezone
days_view, today_iso = slice_days_for_timezone(aggregated_full, tz_effective, tz_fallback)
payload = {
"schema_version": SCHEMA_VERSION,
"location": loc["name"],
"lat": round(loc["lat"], 4),
"lon": round(loc["lon"], 4),
"timezone": tz_effective, # <-- NEW: forecast location timezone
"generated_at_utc": dt.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"units": units,
"all_days": aggregated_full, # <-- NEW: keep unfiltered days in cache
"days": days_view
}
if debug_flag:
payload["debug"] = {
**debug_info,
"tz_effective": tz_effective,
"today_local_date": today_iso,
"first_3_dates_full": sorted({(d.get("date") or "").strip() for d in aggregated_full})[:3]
}
# Save cache & return
try:
with open(cache_path, "w") as f:
json.dump(payload, f, indent=2)
except Exception as e:
print(f"Cache write error (ignored): {e}", file=sys.stderr)
return http_json(payload, status=200)
# -------------------- Entrypoint --------------------
if __name__ == "__main__":
try:
run()
except Exception as ex:
print(f"FATAL: {ex}", file=sys.stderr)
http_error(f"Server error: {ex}", 500)