Skip to content

markowitz.data_providers

markowitz.data_providers

External market-data providers and the S&P 500 point-in-time universe.

This package layers a Polygon.io REST client and a thin yfinance adapter behind a uniform interface, exposing a :func:make_provider factory and a :class:SP500UniverseBuilder that produces survivorship-bias-aware membership snapshots. It is independent of the legacy markowitz.data package — that module still owns the on-disk Parquet cache and Fama-French utilities; this one adds remote OHLCV ingestion and universe construction needed for walk-forward research on a realistic equity universe.

PolygonAuthError

Bases: PolygonError

Raised on HTTP 401/403 — missing or invalid API key.

PolygonDataError

Bases: PolygonError

Raised when the response payload is missing expected fields or is empty.

PolygonError

Bases: Exception

Base class for every Polygon-originated failure.

PolygonProvider(api_key: str | None = None, session: httpx.Client | None = None, rpm: int = _STARTER_RPM)

Polygon.io REST adapter with point-in-time accuracy.

Parameters:

Name Type Description Default
api_key str | None

Polygon API key. If None, falls back to the POLYGON_API_KEY environment variable. A missing key raises :class:PolygonAuthError immediately so misconfiguration surfaces at construction time rather than on the first network call.

None
session Client | None

Optional pre-built :class:httpx.Client. When None the provider owns the lifecycle and closes the session in :meth:close.

None
rpm int

Requests-per-minute ceiling for the token bucket. Defaults to the Polygon Starter tier limit of 100.

_STARTER_RPM
Source code in src/markowitz/data_providers/polygon.py
def __init__(
    self,
    api_key: str | None = None,
    session: httpx.Client | None = None,
    rpm: int = _STARTER_RPM,
) -> None:
    self._api_key = api_key or os.environ.get("POLYGON_API_KEY", "")
    if not self._api_key:
        raise PolygonAuthError(
            "POLYGON_API_KEY is required to instantiate PolygonProvider"
        )
    self._owns_session = session is None
    self._session = session or httpx.Client(timeout=_HTTP_TIMEOUT)
    self._bucket = _TokenBucket(rpm=rpm)

get_eod(ticker: str, start: date, end: date) -> pd.DataFrame

Return daily OHLCV for ticker in the inclusive window [start, end].

Output is TitleCase (Open/High/Low/Close/Volume) with a tz-naive :class:~pandas.DatetimeIndex named Date. Close is split- and dividend-adjusted (Polygon adjusted=true).

Source code in src/markowitz/data_providers/polygon.py
def get_eod(self, ticker: str, start: date, end: date) -> pd.DataFrame:
    """Return daily OHLCV for ``ticker`` in the inclusive window ``[start, end]``.

    Output is TitleCase (Open/High/Low/Close/Volume) with a tz-naive
    :class:`~pandas.DatetimeIndex` named ``Date``. Close is split- and
    dividend-adjusted (Polygon ``adjusted=true``).
    """
    ticker = ticker.strip().upper()
    if start > end:
        raise ValueError(f"start ({start}) must be <= end ({end})")
    return self._fetch_aggs(ticker, start, end)

get_grouped_daily(date_: date) -> pd.DataFrame

Grouped-daily snapshot of every actively-traded US stock on date_.

Index is the ticker symbol; columns are TitleCase OHLCV. Used by the S&P 500 universe builder to know which symbols actually traded on a given historical date.

Source code in src/markowitz/data_providers/polygon.py
def get_grouped_daily(self, date_: date) -> pd.DataFrame:
    """Grouped-daily snapshot of every actively-traded US stock on ``date_``.

    Index is the ticker symbol; columns are TitleCase OHLCV. Used by the
    S&P 500 universe builder to know which symbols actually traded on a
    given historical date.
    """
    path = f"/v2/aggs/grouped/locale/us/market/stocks/{date_.isoformat()}"
    payload = self._request("GET", path, params={"adjusted": "true"})
    results = payload.get("results") or []
    if not results:
        return pd.DataFrame(columns=list(_OHLCV_COLUMNS))
    rows: dict[str, dict[str, float]] = {}
    for bar in results:
        symbol = bar.get("T")
        if not symbol:
            continue
        rows[symbol] = {
            "Open": float(bar.get("o", 0.0)),
            "High": float(bar.get("h", 0.0)),
            "Low": float(bar.get("l", 0.0)),
            "Close": float(bar.get("c", 0.0)),
            "Volume": float(bar.get("v", 0.0)),
        }
    df = pd.DataFrame.from_dict(rows, orient="index")
    df.index.name = "ticker"
    return cast(pd.DataFrame, df)

get_ticker_meta(ticker: str) -> dict[str, Any]

Return the /v3/reference/tickers/{ticker} payload as a dict.

Source code in src/markowitz/data_providers/polygon.py
def get_ticker_meta(self, ticker: str) -> dict[str, Any]:
    """Return the ``/v3/reference/tickers/{ticker}`` payload as a dict."""
    ticker = ticker.strip().upper()
    payload = self._request("GET", f"/v3/reference/tickers/{ticker}")
    results = payload.get("results")
    if not isinstance(results, dict):
        raise PolygonDataError(f"No reference data returned for {ticker}")
    return results

PolygonRateLimitError

Bases: PolygonError

Raised after exhausting retries on HTTP 429.

SP500UniverseBuilder(provider: PolygonProvider | YFinanceProvider | None = None)

Builds and caches point-in-time S&P 500 membership snapshots.

Parameters:

Name Type Description Default
provider PolygonProvider | YFinanceProvider | None

Either a :class:PolygonProvider (preferred — enables PIT membership via grouped-daily) or a :class:YFinanceProvider (fallback — emits a warning and returns the survivorship-biased static list).

None
Source code in src/markowitz/data_providers/sp500_universe.py
def __init__(
    self,
    provider: PolygonProvider | YFinanceProvider | None = None,
) -> None:
    self._provider = provider
    self._cache: dict[date, list[str]] = {}
    self._warned_fallback = False

get_membership_as_of(date_: date) -> list[str]

Return the approximated S&P 500 membership on date_.

When the configured provider exposes a working get_grouped_daily (Polygon path), the result is the intersection of :data:CURRENT_SP500 with the symbols that actually traded on date_. When it does not (no provider, yfinance fallback, or grouped-daily empty), the static list is returned and a warning is emitted on the first such call.

Source code in src/markowitz/data_providers/sp500_universe.py
def get_membership_as_of(self, date_: date) -> list[str]:
    """Return the approximated S&P 500 membership on ``date_``.

    When the configured provider exposes a working ``get_grouped_daily``
    (Polygon path), the result is the intersection of :data:`CURRENT_SP500`
    with the symbols that actually traded on ``date_``. When it does not
    (no provider, yfinance fallback, or grouped-daily empty), the static
    list is returned and a warning is emitted on the first such call.
    """
    cached = self._cache.get(date_)
    if cached is not None:
        return list(cached)

    if self._provider is None:
        members = self._fallback_members()
    else:
        try:
            grouped = self._provider.get_grouped_daily(date_)
        except Exception as exc:
            logger.warning(
                "grouped-daily fetch failed for %s (%s); using static list",
                date_,
                exc,
            )
            members = self._fallback_members()
        else:
            if grouped.empty:
                logger.warning(
                    "grouped-daily empty for %s; returning empty membership",
                    date_,
                )
                members = []
            else:
                active = set(grouped.index)
                members = sorted(t for t in CURRENT_SP500 if t in active)

    self._cache[date_] = list(members)
    return list(members)

get_membership_window(start: date, end: date, freq: str = 'ME') -> dict[date, list[str]]

Build membership at each rebalance date in [start, end].

freq follows pandas offset aliases; default ME = month-end, matching the cadence used by most monthly walk-forward backtests. When [start, end] is shorter than one period the window degenerates to {start, end} so callers always get at least two anchors back.

Source code in src/markowitz/data_providers/sp500_universe.py
def get_membership_window(
    self,
    start: date,
    end: date,
    freq: str = "ME",
) -> dict[date, list[str]]:
    """Build membership at each rebalance date in ``[start, end]``.

    ``freq`` follows pandas offset aliases; default ``ME`` = month-end,
    matching the cadence used by most monthly walk-forward backtests.
    When ``[start, end]`` is shorter than one period the window degenerates
    to ``{start, end}`` so callers always get at least two anchors back.
    """
    if start > end:
        raise ValueError(f"start ({start}) must be <= end ({end})")
    anchors = pd.date_range(start=start, end=end, freq=freq)
    if len(anchors) == 0:
        anchors = pd.DatetimeIndex(
            [pd.Timestamp(start), pd.Timestamp(end)]
        ).unique()
    result: dict[date, list[str]] = {}
    for ts in anchors:
        d = ts.date() if hasattr(ts, "date") else ts
        result[d] = self.get_membership_as_of(d)
    return result

YFinanceProvider(inner: Any = None)

yfinance-backed provider matching :class:PolygonProvider's surface.

Parameters:

Name Type Description Default
inner Any

Optional pre-built provider exposing fetch(ticker, start, end) and returning a frame with a close column. Defaults to :class:markowitz.data.providers.YFinanceProvider. Exposed as a hook for tests so we never touch the network.

None
Source code in src/markowitz/data_providers/yfinance.py
def __init__(self, inner: Any = None) -> None:
    if inner is None:
        from markowitz.data.providers import (  # noqa: PLC0415
            YFinanceProvider as _LegacyYF,
        )

        inner = _LegacyYF()
    self._inner = inner

get_eod(ticker: str, start: date, end: date) -> pd.DataFrame

Return daily OHLCV via yfinance, normalized to TitleCase columns.

yfinance returns Open/High/Low/Close/Volume natively when called through yf.download, but the legacy adapter in this repo collapses the frame down to a single close column. We reconstruct the full OHLCV view by re-querying yfinance directly when available, falling back to a close-only frame (Open/High/Low filled with NaN, Volume with 0) when the inner provider doesn't expose it.

Source code in src/markowitz/data_providers/yfinance.py
def get_eod(self, ticker: str, start: date, end: date) -> pd.DataFrame:
    """Return daily OHLCV via yfinance, normalized to TitleCase columns.

    yfinance returns ``Open/High/Low/Close/Volume`` natively when called
    through ``yf.download``, but the legacy adapter in this repo collapses
    the frame down to a single ``close`` column. We reconstruct the full
    OHLCV view by re-querying yfinance directly when available, falling
    back to a close-only frame (Open/High/Low filled with NaN, Volume with
    0) when the inner provider doesn't expose it.
    """
    ticker = ticker.strip().upper()
    if start > end:
        raise ValueError(f"start ({start}) must be <= end ({end})")
    frame = self._inner.fetch(ticker, start, end)
    if "Close" in frame.columns and {"Open", "High", "Low", "Volume"}.issubset(
        frame.columns
    ):
        ohlcv = frame[list(_OHLCV_COLUMNS)].copy()
    else:
        close = frame["close"] if "close" in frame.columns else frame["Close"]
        ohlcv = pd.DataFrame(
            {
                "Open": pd.Series(index=close.index, dtype="float64"),
                "High": pd.Series(index=close.index, dtype="float64"),
                "Low": pd.Series(index=close.index, dtype="float64"),
                "Close": close.astype("float64"),
                "Volume": pd.Series(0.0, index=close.index, dtype="float64"),
            }
        )
    ohlcv.index = pd.DatetimeIndex(ohlcv.index, name="Date")
    return cast(pd.DataFrame, ohlcv)

make_provider(api_key: str | None = None, *, inner_yfinance: Any = None) -> PolygonProvider | YFinanceProvider

Return a Polygon provider if a key is configured, otherwise yfinance.

Parameters:

Name Type Description Default
api_key str | None

Explicit Polygon API key. When None the factory falls back to the POLYGON_API_KEY environment variable.

None
inner_yfinance Any

Optional pre-built provider passed straight through to :class:YFinanceProvider when the fallback path is taken — useful for tests that need to inject a stub instead of touching the network.

None
Source code in src/markowitz/data_providers/factory.py
def make_provider(
    api_key: str | None = None,
    *,
    inner_yfinance: Any = None,
) -> PolygonProvider | YFinanceProvider:
    """Return a Polygon provider if a key is configured, otherwise yfinance.

    Parameters
    ----------
    api_key:
        Explicit Polygon API key. When ``None`` the factory falls back to the
        ``POLYGON_API_KEY`` environment variable.
    inner_yfinance:
        Optional pre-built provider passed straight through to
        :class:`YFinanceProvider` when the fallback path is taken — useful
        for tests that need to inject a stub instead of touching the network.
    """
    key = (api_key or os.environ.get("POLYGON_API_KEY", "")).strip()
    if key:
        try:
            return PolygonProvider(api_key=key)
        except PolygonAuthError:
            # Construction failed despite a non-empty key — fall back rather
            # than propagate, so the demo keeps running with yfinance data.
            logger.warning(
                "POLYGON_API_KEY present but PolygonProvider rejected it; "
                "falling back to yfinance"
            )
    else:
        logger.info(
            "POLYGON_API_KEY not set; using yfinance provider (no PIT universe)"
        )
    return YFinanceProvider(inner=inner_yfinance)