News Alpha Trader¶
An end-to-end LLM-driven trading notebook. Overnight news → structured sentiment signals → paper (or live) orders via Alpaca → cost-adjusted P&L, all persisted to a local DuckDB so the data accumulates across sessions.
This example is the opposite of the toy notebooks in this directory — it talks to real APIs, spends real tokens, and (if you opt in) places real orders.
What it does¶
- Pulls overnight news for a watchlist (Alpaca Market Data — Benzinga feed)
- Extracts structured sentiment per headline via the LLM with
@output_schema(validated JSON, auto-retried on failure) - Fetches latest prices and computes available capital
- Generates a trade plan under hard risk limits (max position size, daily trade cap, ticker whitelist)
- Submits orders — paper by default, live only behind a two-key lock
- Reconciles fills, records realized costs (slippage, TAF, spread)
- Reports cost-adjusted P&L — the only number that matters: did the strategy beat its own API and transaction bill?
- LLM writes a daily journal entry with the day's lessons
Required API keys¶
Set these in the Runtime panel:
| Key | Used for |
|---|---|
ALPACA_API_KEY |
News + prices + orders |
ALPACA_API_SECRET |
Same |
ANTHROPIC_API_KEY or OPENAI_API_KEY |
Signal extraction + daily note |
Alpaca free tier gives 200 req/min, 7+ years history, 15-min REST delay. Plenty for daily rebalancing.
Paper vs live¶
The notebook ships in paper mode. Every code path is identical between paper and live; only the Alpaca endpoint URL differs. To flip to live you must:
- Edit
helpers.py→ setConfig.MODE = "live" - Edit
helpers.py→ setConfig.I_UNDERSTAND_THIS_IS_REAL_MONEY = True - Your Alpaca account must be funded
The place_orders cell refuses to run live without both flags set. This is intentional — two edits on purpose.
Risk limits (hard, enforced in code)¶
Change in helpers.py → Config class:
MAX_POSITION_USD— per-ticker notional cap (default $500)MAX_TOTAL_EXPOSURE_USD— portfolio notional cap (default $2000)MAX_DAILY_TRADES— daily trade count cap (default 10)TICKER_WHITELIST— the LLM cannot place orders in tickers outside this list, even if a headline mentions one
DuckDB schema¶
One file: trading.db next to the notebook. Tables:
news_raw— raw headlines (idempotent insert byarticle_id)signals— extracted sentiment per articleprices— daily OHLC per tickerorders— every order attempt, expected cost, Alpaca order idtrades— reconciled fills with realized costpositions— current positions (rebuilt fromtrades)costs— every dollar spent: LLM tokens, market data, commissions, TAF fees, slippage
Running¶
Re-run tomorrow — fetch_news only pulls new articles, place_orders is idempotent by signal_id. The DB grows over time; so does the history you can analyze.
⚠️ Disclaimers¶
- This is not financial advice. The strategy is a demo. It may lose money. Backtest thoroughly before live trading.
- LLM outputs are non-deterministic even with
temperature=0.0. A prompt injection in a news headline could theoretically shift a signal — the ticker whitelist is the hard backstop. - Paper fills are instantaneous and slippage-free. Live fills are not. The cost model applies estimated slippage to both modes so the numbers stay honest.
- You are responsible for understanding every line of
place_orders.pybefore flipping to live.
Read before running¶
This notebook is the first thing you see when you open the project. It does
nothing computationally — the real work starts in helpers.
Key facts¶
-
Real money mode requires two opt-ins. This notebook can place real orders on your Alpaca account when both
Config.MODE == "live"andConfig.I_UNDERSTAND_THIS_IS_REAL_MONEY == True. Both live inhelpers.pyand default to"paper"/False. You must edit both on purpose to trade live. -
Every LLM call costs real money. Every market-data call counts against your Alpaca rate limit (200/min free tier). The
costscell tracks both — check it before you leave the notebook running unattended. -
Paper fills aren't realistic. Paper fills on Alpaca are instantaneous and slippage-free, which is not how real markets work. We apply a modeled slippage cost even in paper mode so the cost-adjusted P&L doesn't lie when you eventually flip to live.
-
LLM outputs are non-deterministic. A hostile news headline could in principle influence a signal. We enforce a ticker whitelist (
Config.TICKER_WHITELIST) at the order layer — the LLM cannot place orders in tickers outside that list regardless of what the signal says. -
Nothing here is financial advice. The strategy is a demo. Backtest on historical data (see
pnl_report) before trusting it with capital.
Helpers & Config¶
kind python
# @name Helpers & Config
# Module cell — pure defs + literal constants. Downstream cells import
# Config, the DuckDB/Alpaca factories, and the cost estimators by name.
#
# Keeping ALL connection and policy knobs in one place is deliberate:
# flipping paper→live, widening the watchlist, or tightening risk is a
# single-cell edit. The DB schema also lives here so every downstream
# cell can assume the tables exist after this cell runs once.
import datetime as dt
import json
import os
from decimal import Decimal
from pathlib import Path
class Config:
# --- Mode gate -----------------------------------------------------
# Set MODE="live" AND I_UNDERSTAND_THIS_IS_REAL_MONEY=True to place
# real orders. place_orders refuses to trade live without both.
MODE = "paper"
I_UNDERSTAND_THIS_IS_REAL_MONEY = False
# --- Universe ------------------------------------------------------
# Hard whitelist. place_orders drops any proposed trade whose ticker
# is not in this list, regardless of what a signal claims.
TICKER_WHITELIST = ("AAPL", "MSFT", "NVDA", "GOOGL", "AMZN")
# --- Risk limits ---------------------------------------------------
MAX_POSITION_USD = 500.0
MAX_TOTAL_EXPOSURE_USD = 2000.0
MAX_DAILY_TRADES = 10
MIN_CONFIDENCE = 0.7
MIN_ABS_SENTIMENT = 0.4
# --- Cost model ----------------------------------------------------
# LLM pricing is fetched from LiteLLM's public catalog at runtime —
# see _fetch_litellm_pricing() below. Fallback rate used only when
# the catalog can't be reached AND the model is unknown; matches
# mid-tier Sonnet pricing so we overstate rather than log $0.
FALLBACK_LLM_PRICE_USD_PER_M = (3.0, 15.0)
# Alpaca equities: $0 commission. Regulatory TAF only on sells.
TAF_USD_PER_SHARE = 0.000166
TAF_MAX_USD_PER_TRADE = 8.30
# Assumed half-spread / market impact applied to BOTH paper and
# live fills so cost-adjusted P&L stays honest when you flip modes.
SLIPPAGE_BPS = 5.0
# --- Storage -------------------------------------------------------
# "." resolves to the notebook directory at runtime.
DB_FILENAME = "trading.db"
LOOKBACK_DAYS = 30
def db_path() -> Path:
"""Where the DuckDB file lives (notebook directory)."""
return Path(".") / Config.DB_FILENAME
def open_db():
"""Open (or create) the DuckDB file and ensure the schema exists.
Connection is short-lived: each cell gets its own. DuckDB handles
concurrent read-only + single-writer access fine for this scale.
"""
import duckdb
conn = duckdb.connect(str(db_path()))
_ensure_schema(conn)
return conn
def _ensure_schema(conn) -> None:
"""Idempotent DDL. Run on every open_db() call — CREATE IF NOT EXISTS."""
conn.execute(
"""
CREATE TABLE IF NOT EXISTS news_raw (
article_id BIGINT PRIMARY KEY,
ticker VARCHAR NOT NULL,
headline VARCHAR NOT NULL,
summary VARCHAR,
url VARCHAR,
source VARCHAR,
published_at TIMESTAMP NOT NULL,
ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS signals (
signal_id VARCHAR PRIMARY KEY,
article_id BIGINT NOT NULL,
ticker VARCHAR NOT NULL,
sentiment DOUBLE NOT NULL,
confidence DOUBLE NOT NULL,
theme VARCHAR,
reasoning VARCHAR,
extracted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
input_tokens INTEGER,
output_tokens INTEGER,
model VARCHAR
);
CREATE TABLE IF NOT EXISTS prices (
ticker VARCHAR NOT NULL,
ts TIMESTAMP NOT NULL,
open DOUBLE, high DOUBLE, low DOUBLE, close DOUBLE,
volume BIGINT,
PRIMARY KEY (ticker, ts)
);
CREATE TABLE IF NOT EXISTS orders (
order_id VARCHAR PRIMARY KEY,
signal_id VARCHAR,
ticker VARCHAR NOT NULL,
side VARCHAR NOT NULL,
qty DOUBLE NOT NULL,
mode VARCHAR NOT NULL,
submitted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status VARCHAR NOT NULL,
expected_cost_usd DOUBLE,
alpaca_client_order_id VARCHAR
);
CREATE TABLE IF NOT EXISTS trades (
trade_id VARCHAR PRIMARY KEY,
order_id VARCHAR NOT NULL,
ticker VARCHAR NOT NULL,
side VARCHAR NOT NULL,
qty DOUBLE NOT NULL,
fill_price DOUBLE NOT NULL,
fill_ts TIMESTAMP NOT NULL,
realized_cost_usd DOUBLE
);
CREATE TABLE IF NOT EXISTS positions (
ticker VARCHAR PRIMARY KEY,
qty DOUBLE NOT NULL,
avg_cost DOUBLE NOT NULL,
last_update TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS costs (
cost_id VARCHAR PRIMARY KEY,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
source VARCHAR NOT NULL,
detail_json VARCHAR,
usd DOUBLE NOT NULL,
signal_id VARCHAR,
order_id VARCHAR
);
"""
)
def alpaca_credentials() -> tuple[str, str]:
"""Pull Alpaca key + secret from the notebook env. Raises if missing."""
key = os.environ.get("ALPACA_API_KEY")
secret = os.environ.get("ALPACA_API_SECRET")
if not key or not secret:
raise RuntimeError(
"ALPACA_API_KEY / ALPACA_API_SECRET not set. "
"Add them in the Runtime panel before running data/trade cells."
)
return key, secret
def alpaca_trading_client():
"""Trading client pinned to the current Config.MODE.
``paper=True`` routes to paper-api.alpaca.markets; False routes to
api.alpaca.markets (live). We never silently swap modes at call
time — the caller must own the mode decision.
"""
from alpaca.trading.client import TradingClient
key, secret = alpaca_credentials()
return TradingClient(key, secret, paper=(Config.MODE == "paper"))
def alpaca_data_clients() -> tuple:
"""News + historical bar clients. Data is mode-independent."""
from alpaca.data.historical import StockHistoricalDataClient
from alpaca.data.historical.news import NewsClient
key, secret = alpaca_credentials()
return StockHistoricalDataClient(key, secret), NewsClient(key, secret)
_LITELLM_PRICING_URL = (
"https://raw.githubusercontent.com/BerriAI/litellm/main/"
"model_prices_and_context_window.json"
)
def _fetch_litellm_pricing() -> dict:
"""Download LiteLLM's pricing catalog. Cached module-wide after first hit.
LiteLLM maintains per-token USD rates for ~2000 models. If the fetch
fails (offline, 403, etc.) we cache an empty dict so we don't retry
on every cost lookup — the fallback rate takes over downstream.
"""
global _litellm_pricing_cache # noqa: PLW0603
if _litellm_pricing_cache is not None:
return _litellm_pricing_cache
import httpx
try:
_litellm_pricing_cache = httpx.get(_LITELLM_PRICING_URL, timeout=5.0).json()
except Exception as exc:
print(f"warning: LiteLLM pricing fetch failed ({exc}); using fallback rate")
_litellm_pricing_cache = {}
return _litellm_pricing_cache
_litellm_pricing_cache: dict | None = None
def active_llm_model() -> str | None:
"""The model the Runtime panel last wrote to notebook.toml [ai].model.
Returns None before the user picks one — callers then fall back to the
conservative rate below. Notebook directory is CWD at cell execution
time, so plain relative-path tomllib is enough.
"""
import tomllib
try:
with open("notebook.toml", "rb") as f:
return tomllib.load(f).get("ai", {}).get("model")
except Exception:
return None
def estimate_llm_cost(
input_tokens: int, output_tokens: int, model: str | None = None
) -> float:
"""Dollar cost of a single LLM call.
Pricing comes from LiteLLM's public catalog; the model defaults to
whatever the Runtime panel selected. Unknown models fall back to
FALLBACK_LLM_PRICE_USD_PER_M so we overstate rather than log $0.
"""
if model is None:
model = active_llm_model()
entry = _fetch_litellm_pricing().get(model or "", {})
in_per_token = entry.get("input_cost_per_token")
out_per_token = entry.get("output_cost_per_token")
if in_per_token is None or out_per_token is None:
in_m, out_m = Config.FALLBACK_LLM_PRICE_USD_PER_M
return (input_tokens / 1_000_000.0) * in_m + (output_tokens / 1_000_000.0) * out_m
return input_tokens * in_per_token + output_tokens * out_per_token
def estimate_trade_cost(qty: float, side: str, price: float) -> dict:
"""Expected cost of placing an order.
Three components:
- commission (0 on Alpaca equities)
- TAF fee (sells only, tiny)
- slippage (Config.SLIPPAGE_BPS applied to notional, one-way)
Applied to paper trades too — that's the point, so the backtest
doesn't tell you a pleasant lie.
"""
notional = abs(qty) * price
slippage_usd = notional * (Config.SLIPPAGE_BPS / 10_000.0)
taf_usd = 0.0
if side.lower() == "sell":
taf_usd = min(
abs(qty) * Config.TAF_USD_PER_SHARE,
Config.TAF_MAX_USD_PER_TRADE,
)
commission_usd = 0.0
total = commission_usd + taf_usd + slippage_usd
return {
"commission_usd": commission_usd,
"taf_usd": taf_usd,
"slippage_usd": slippage_usd,
"total_usd": total,
}
def record_cost(
conn,
*,
source: str,
usd: float,
detail: dict,
signal_id: str | None = None,
order_id: str | None = None,
) -> None:
"""Append a single row to the costs ledger."""
import uuid as _uuid
conn.execute(
"""
INSERT INTO costs (cost_id, source, detail_json, usd, signal_id, order_id)
VALUES (?, ?, ?, ?, ?, ?)
""",
[_uuid.uuid4().hex, source, json.dumps(detail, default=str), usd, signal_id, order_id],
)
def today_utc() -> dt.date:
return dt.datetime.now(dt.timezone.utc).date()
def watchlist_str() -> str:
"""Alpaca's symbols= param wants a comma-joined list."""
return ",".join(Config.TICKER_WHITELIST)
Fetch overnight news¶
kind python
# @name Fetch overnight news
# Pulls recent Benzinga headlines for every whitelisted ticker and
# upserts into the ``news_raw`` table. Idempotent — re-running the
# cell only inserts articles we haven't seen before, so the DB grows
# monotonically across sessions.
import datetime as dt
import pandas as pd
from alpaca.data.requests import NewsRequest
conn = open_db()
_stock_client, news_client = alpaca_data_clients()
# Only pull since the newest article we already have for any of the
# whitelisted tickers. First run pulls Config.LOOKBACK_DAYS back.
latest_ts_row = conn.execute(
"SELECT MAX(published_at) FROM news_raw WHERE ticker IN ({})".format(
",".join(f"'{t}'" for t in Config.TICKER_WHITELIST)
)
).fetchone()
latest_ts = latest_ts_row[0] if latest_ts_row and latest_ts_row[0] else None
if latest_ts is None:
since = dt.datetime.now(dt.timezone.utc) - dt.timedelta(days=Config.LOOKBACK_DAYS)
else:
# +1 second so we don't re-fetch the boundary article.
since = (latest_ts + dt.timedelta(seconds=1)).replace(tzinfo=dt.timezone.utc)
request = NewsRequest(
symbols=watchlist_str(),
start=since,
limit=50,
include_content=False,
)
response = news_client.get_news(request)
raw_news = response.data.get("news", []) if hasattr(response, "data") else list(response)
inserted = 0
for article in raw_news:
# alpaca-py returns a NewsSet on some SDK versions and a list on
# others — normalize to the underlying attrs.
article_id = getattr(article, "id", None)
headline = getattr(article, "headline", "") or ""
summary = getattr(article, "summary", "") or ""
url = getattr(article, "url", "") or ""
source = getattr(article, "source", "") or ""
symbols = getattr(article, "symbols", []) or []
created_at = getattr(article, "created_at", None) or getattr(article, "published_at", None)
if article_id is None or created_at is None:
continue
# An article can mention multiple tickers; explode into one row per
# (article, ticker) so downstream joins are natural.
for ticker in symbols:
if ticker not in Config.TICKER_WHITELIST:
continue
try:
conn.execute(
"""
INSERT INTO news_raw
(article_id, ticker, headline, summary, url, source, published_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
[article_id, ticker, headline, summary, url, source, created_at],
)
inserted += 1
except Exception:
# Primary key conflict — already have this article. Fine.
pass
# Log "ingestion cost" as zero but record the row so the ledger has a
# complete audit trail of every external call.
record_cost(
conn,
source="alpaca_news",
usd=0.0,
detail={"articles_seen": len(raw_news), "inserted": inserted, "since": str(since)},
)
# What's unprocessed — used by extract_signals.
unprocessed = conn.execute(
"""
SELECT n.article_id, n.ticker, n.headline, n.summary, n.published_at
FROM news_raw n
LEFT JOIN signals s ON s.article_id = n.article_id AND s.ticker = n.ticker
WHERE s.signal_id IS NULL
ORDER BY n.published_at DESC
LIMIT 40
"""
).fetchdf()
conn.close()
news_ingest = {
"new_rows": inserted,
"unprocessed_count": len(unprocessed),
"oldest_unprocessed": (
str(unprocessed["published_at"].min()) if len(unprocessed) else None
),
}
print(
f"news_raw: inserted {inserted} new rows; {len(unprocessed)} articles "
f"awaiting signal extraction."
)
# Expose ``unprocessed`` so the prompt cell can reference {{ unprocessed }}.
unprocessed
Turn news into signals¶
We've pulled raw headlines from the news API. The cell below is a
prompt cell — it sends each headline to an LLM with an
@output_schema that forces a structured JSON response (ticker,
direction, confidence). The schema-validated result becomes a
content-addressed artifact like any Python cell's output, so
identical headlines + identical prompt template = cache hit, no
extra API call.
After signal extraction, the next few cells persist them, fetch matching price data, and run risk checks before any orders go out.
signals_batch¶
kind prompt
Prompt cell — response intentionally excluded from export.
# @name signals_batch
# @temperature 0.0
# @max_tokens 4096
# @system You are a quantitative-trading analyst extracting market-impact signals from news headlines. Return a row for every input article — the risk filter downstream drops weak signals, so do not pre-filter here.
# @output_schema {"type": "object", "properties": {"signals": {"type": "array", "items": {"type": "object", "properties": {"article_id": {"type": "integer"}, "ticker": {"type": "string"}, "sentiment": {"type": "number", "minimum": -1, "maximum": 1}, "confidence": {"type": "number", "minimum": 0, "maximum": 1}, "theme": {"type": "string", "enum": ["earnings", "guidance", "product", "regulatory", "macro", "m_and_a", "legal", "people", "other"]}, "reasoning": {"type": "string"}}, "required": ["article_id", "ticker", "sentiment", "confidence", "theme", "reasoning"]}}}, "required": ["signals"]}
# @validate_retries 3
#
# Extracts structured sentiment for each article in the unprocessed
# batch. The schema enums are deliberately narrow — a tighter enum
# means cleaner aggregation SQL downstream. The validate-and-retry
# loop kicks in when the provider falls back to json_object mode
# (Anthropic, Gemini, Mistral) and the response doesn't quite match.
#
# Prompt-design note: earlier prompts told the model to "err toward
# low confidence on ambiguous headlines." Models read that as "omit
# the row" and returned empty arrays on unglamorous news batches.
# The current phrasing is explicit — return one row per article,
# encode uncertainty in the confidence score, don't drop articles.
# Config.MIN_CONFIDENCE + MIN_ABS_SENTIMENT at the risk layer are
# where the actual trading threshold lives.
Return exactly one signal per input article — do NOT omit rows. Encode
uncertainty in the `confidence` field. The downstream risk filter
drops low-confidence rows before any trade is placed, so your job
here is just to characterize every headline we fetched.
For each article, emit:
- `article_id` and `ticker` — copied verbatim from the input row.
- `sentiment` in [-1, 1]: -1 = clearly bearish for this ticker,
+1 = clearly bullish, 0 = neutral / not directionally relevant.
- `confidence` in [0, 1]:
- > 0.8 for clear-cut material news (earnings beat, explicit
guidance, confirmed M&A, major product launch, regulatory
action with a named target).
- 0.5–0.8 for solid analyst coverage, sector news with clear
ticker impact, leadership moves with stated effect.
- 0.2–0.5 for rumors, side mentions, ambiguous macro.
- < 0.2 for headlines that aren't really about this ticker
(cross-references, generic market commentary). Still return
the row — use 0 sentiment and explain in `reasoning`.
- `theme` — pick the single best enum value.
- `reasoning` — one short sentence explaining the score.
Do NOT infer signals for tickers other than the one on each row.
Articles:
{{ unprocessed }}
Persist signals¶
kind python
# @name Persist signals
# Writes the freshly-extracted batch to the ``signals`` table and logs
# the LLM call to the ``costs`` ledger so cost-adjusted P&L has
# something to subtract later.
import hashlib
conn = open_db()
batch = signals_batch or {}
rows = batch.get("signals", []) if isinstance(batch, dict) else []
# Only persist rows whose ticker is in the whitelist. The schema enum
# doesn't constrain ticker strings — the LLM could return anything —
# so this is the last layer of defense before the DB has a row that
# place_orders would try to trade.
allowed = set(Config.TICKER_WHITELIST)
persisted = 0
for row in rows:
ticker = row.get("ticker")
article_id = row.get("article_id")
if ticker not in allowed or article_id is None:
continue
# Deterministic signal_id so re-running the cell is idempotent.
seed = f"{article_id}:{ticker}".encode()
signal_id = hashlib.sha256(seed).hexdigest()[:16]
try:
conn.execute(
"""
INSERT INTO signals
(signal_id, article_id, ticker, sentiment, confidence,
theme, reasoning, input_tokens, output_tokens, model)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
[
signal_id,
article_id,
ticker,
float(row.get("sentiment", 0.0)),
float(row.get("confidence", 0.0)),
str(row.get("theme", "other")),
str(row.get("reasoning", "")),
None, # populated below if transform_spec carries it
None,
None,
],
)
persisted += 1
except Exception:
# Primary-key conflict on re-run — signal already persisted.
pass
# LLM cost accounting. The prompt cell doesn't expose tokens to
# downstream cells directly; we pull them off the signals_batch
# artifact's transform_spec at reconcile time, so here we just log a
# placeholder row and let the reconciliation cell fill in the real
# dollar figure when it inspects the artifact's metadata. For now,
# estimate from the template+response size so the ledger always has
# an entry even if reconciliation is skipped.
approx_in_tokens = int(len(str(unprocessed)) / 4)
approx_out_tokens = int(
len(str(batch)) / 4 if isinstance(batch, dict) else 0
)
approx_cost = estimate_llm_cost(approx_in_tokens, approx_out_tokens)
record_cost(
conn,
source="llm_signals",
usd=approx_cost,
detail={
"rows_in": len(rows),
"persisted": persisted,
"approx_input_tokens": approx_in_tokens,
"approx_output_tokens": approx_out_tokens,
"note": "estimated from template size; exact token counts in artifact",
},
)
# Fresh signals ready for risk_check.
fresh_signals = conn.execute(
"""
SELECT s.signal_id, s.ticker, s.sentiment, s.confidence, s.theme,
s.extracted_at, n.headline
FROM signals s
JOIN news_raw n USING (article_id, ticker)
WHERE s.signal_id NOT IN (SELECT COALESCE(signal_id, '') FROM orders)
AND s.confidence >= ?
AND ABS(s.sentiment) >= ?
ORDER BY s.extracted_at DESC
""",
[Config.MIN_CONFIDENCE, Config.MIN_ABS_SENTIMENT],
).fetchdf()
conn.close()
print(f"persisted {persisted} new signals; {len(fresh_signals)} meet risk thresholds")
fresh_signals
Fetch latest prices¶
kind python
# @name Fetch latest prices
# Pulls daily bars for the watchlist up to "now" and upserts into
# ``prices``. We need a recent close to size positions; the 15-min
# REST delay is fine here — daily rebalancing doesn't need sub-minute
# precision.
import datetime as dt
from alpaca.data.requests import StockBarsRequest
from alpaca.data.timeframe import TimeFrame
conn = open_db()
stock_client, _news = alpaca_data_clients()
start = dt.datetime.now(dt.timezone.utc) - dt.timedelta(days=Config.LOOKBACK_DAYS)
request = StockBarsRequest(
symbol_or_symbols=list(Config.TICKER_WHITELIST),
timeframe=TimeFrame.Day,
start=start,
)
bar_response = stock_client.get_stock_bars(request)
# alpaca-py returns a BarSet; .df is a MultiIndex DataFrame keyed by
# (symbol, timestamp). Normalize to rows.
frame = bar_response.df.reset_index()
inserted = 0
for row in frame.itertuples(index=False):
try:
conn.execute(
"""
INSERT INTO prices (ticker, ts, open, high, low, close, volume)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT DO NOTHING
""",
[
row.symbol,
row.timestamp,
float(row.open),
float(row.high),
float(row.low),
float(row.close),
int(row.volume),
],
)
inserted += 1
except Exception:
pass
record_cost(
conn,
source="alpaca_data",
usd=0.0, # free tier
detail={"bars_fetched": len(frame), "inserted": inserted},
)
# Latest close per ticker — what risk_check uses for sizing.
latest_prices = conn.execute(
"""
SELECT ticker, close, ts AS as_of
FROM prices
WHERE (ticker, ts) IN (
SELECT ticker, MAX(ts) FROM prices GROUP BY ticker
)
ORDER BY ticker
"""
).fetchdf()
conn.close()
print(f"prices: upserted {inserted} bars; latest for {len(latest_prices)} tickers.")
latest_prices
Risk check & trade plan¶
kind python
# @name Risk check & trade plan
# Translates fresh signals into a concrete list of orders after
# applying all hard risk constraints. Nothing here talks to Alpaca —
# output is a DataFrame that place_orders either submits or drops
# on dry-run.
import pandas as pd
conn = open_db()
# --- Current state -----------------------------------------------------
current_positions = conn.execute(
"SELECT ticker, qty, avg_cost FROM positions WHERE qty != 0"
).fetchdf()
open_exposure = 0.0
if len(current_positions):
# Use last known close for exposure math.
current_positions = current_positions.merge(
latest_prices[["ticker", "close"]], on="ticker", how="left"
)
current_positions["notional"] = (
current_positions["qty"] * current_positions["close"]
).abs()
open_exposure = float(current_positions["notional"].sum())
orders_today = conn.execute(
"""
SELECT COUNT(*) FROM orders
WHERE CAST(submitted_at AS DATE) = CURRENT_DATE
"""
).fetchone()[0]
trades_budget = max(0, Config.MAX_DAILY_TRADES - int(orders_today))
# --- Signals → trade plan ---------------------------------------------
#
# Simple rule: one entry per fresh signal, sized proportionally to
# signal strength (sentiment × confidence), capped at
# MAX_POSITION_USD. No pyramiding on existing positions — if we
# already hold the ticker, skip.
trade_plan_rows = []
held = set(current_positions["ticker"]) if len(current_positions) else set()
price_lookup = dict(zip(latest_prices["ticker"], latest_prices["close"], strict=False))
headroom_usd = max(0.0, Config.MAX_TOTAL_EXPOSURE_USD - open_exposure)
for signal in fresh_signals.itertuples(index=False):
if len(trade_plan_rows) >= trades_budget:
break
if signal.ticker in held:
continue
price = price_lookup.get(signal.ticker)
if not price or price <= 0:
continue
strength = float(signal.sentiment) * float(signal.confidence)
if abs(strength) < (Config.MIN_CONFIDENCE * Config.MIN_ABS_SENTIMENT):
continue
# Long on positive, short on negative. Alpaca Basic supports
# shorting on margin; set strength positive-only to disable shorts.
side = "buy" if strength > 0 else "sell"
# Size = |strength| × position cap, further clamped by headroom.
target_usd = min(abs(strength) * Config.MAX_POSITION_USD, headroom_usd)
if target_usd < price:
# Can't afford even one share within limits.
continue
qty = int(target_usd // price)
if qty <= 0:
continue
expected = estimate_trade_cost(qty, side, price)
trade_plan_rows.append(
{
"signal_id": signal.signal_id,
"ticker": signal.ticker,
"side": side,
"qty": qty,
"price_hint": price,
"strength": strength,
"theme": signal.theme,
"expected_cost_usd": expected["total_usd"],
}
)
headroom_usd -= qty * price
if headroom_usd <= 0:
break
# Empty lists produce a DataFrame with zero columns, which trips up
# the downstream filters in place_orders. Pin the schema.
_TRADE_PLAN_COLUMNS = [
"signal_id",
"ticker",
"side",
"qty",
"price_hint",
"strength",
"theme",
"expected_cost_usd",
]
trade_plan = pd.DataFrame(trade_plan_rows, columns=_TRADE_PLAN_COLUMNS)
conn.close()
print(
f"risk_check: {len(fresh_signals)} fresh signals → {len(trade_plan)} proposed "
f"orders. Open exposure ${open_exposure:.2f}, trades budget left {trades_budget}."
)
trade_plan
Place orders, reconcile, report¶
Signal extraction and risk checks land cached artifacts upstream. Below is the execution side: send the trade plan to the broker, reconcile what filled, attribute trading costs, and write the end-of-day P&L into a structured report.
Editing any analysis cell above invalidates the downstream execution cells through provenance, so a re-run never sends stale orders. The very last cell is a prompt cell that summarizes the day's session for human review.
Place orders¶
kind python
# @name Place orders
# Takes the trade_plan from risk_check and either DRY-RUNs it or
# submits each row to Alpaca. Re-running is idempotent — if a
# signal_id is already in ``orders``, we skip it.
#
# The live-mode gate is intentionally verbose. Flipping from paper
# to live takes two separate edits in helpers.py — this cell enforces
# the second one at runtime.
import uuid
from alpaca.trading.enums import OrderSide, TimeInForce
from alpaca.trading.requests import MarketOrderRequest
conn = open_db()
# Resolve mode once so logs are consistent even if Config changes
# mid-run.
mode = Config.MODE
# Hard gate — refuses to trade live without the second key.
if mode == "live" and not Config.I_UNDERSTAND_THIS_IS_REAL_MONEY:
raise RuntimeError(
"Config.MODE is 'live' but Config.I_UNDERSTAND_THIS_IS_REAL_MONEY is False. "
"Set both in helpers.py on purpose before placing real orders."
)
if mode not in ("paper", "live"):
raise ValueError(f"Config.MODE must be 'paper' or 'live', got {mode!r}.")
# Deduplicate against anything already submitted today.
already_submitted = set(
row[0]
for row in conn.execute(
"SELECT signal_id FROM orders WHERE signal_id IS NOT NULL"
).fetchall()
)
if len(trade_plan) == 0 or "signal_id" not in trade_plan.columns:
to_submit = trade_plan.iloc[0:0] # empty but with compatible shape
skipped_dupe = 0
else:
to_submit = trade_plan[~trade_plan["signal_id"].isin(already_submitted)]
skipped_dupe = len(trade_plan) - len(to_submit)
submitted_rows = []
if len(to_submit):
client = alpaca_trading_client()
for row in to_submit.itertuples(index=False):
client_order_id = f"strata-{row.signal_id}-{uuid.uuid4().hex[:6]}"
request = MarketOrderRequest(
symbol=row.ticker,
qty=int(row.qty),
side=OrderSide.BUY if row.side == "buy" else OrderSide.SELL,
time_in_force=TimeInForce.DAY,
client_order_id=client_order_id,
)
try:
alpaca_order = client.submit_order(order_data=request)
except Exception as exc:
conn.execute(
"""
INSERT INTO orders
(order_id, signal_id, ticker, side, qty, mode,
status, expected_cost_usd, alpaca_client_order_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
[
uuid.uuid4().hex,
row.signal_id,
row.ticker,
row.side,
float(row.qty),
mode,
f"submit_failed: {type(exc).__name__}",
float(row.expected_cost_usd),
client_order_id,
],
)
continue
order_id = str(getattr(alpaca_order, "id", uuid.uuid4().hex))
status = str(getattr(alpaca_order, "status", "submitted"))
conn.execute(
"""
INSERT INTO orders
(order_id, signal_id, ticker, side, qty, mode,
status, expected_cost_usd, alpaca_client_order_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
[
order_id,
row.signal_id,
row.ticker,
row.side,
float(row.qty),
mode,
status,
float(row.expected_cost_usd),
client_order_id,
],
)
# Expected-cost is booked to ``costs`` now; reconcile will
# add any delta (realized_cost − expected) after fills land.
record_cost(
conn,
source="trade_expected",
usd=float(row.expected_cost_usd),
detail={
"ticker": row.ticker,
"side": row.side,
"qty": float(row.qty),
"price_hint": float(row.price_hint),
"mode": mode,
},
signal_id=row.signal_id,
order_id=order_id,
)
submitted_rows.append(
{
"order_id": order_id,
"ticker": row.ticker,
"side": row.side,
"qty": int(row.qty),
"status": status,
"expected_cost_usd": float(row.expected_cost_usd),
}
)
conn.close()
import pandas as pd
# Pin columns so daily_note and reconcile see a consistent schema
# even on a no-op run.
_SUBMITTED_COLUMNS = ["order_id", "ticker", "side", "qty", "status", "expected_cost_usd"]
submitted = pd.DataFrame(submitted_rows, columns=_SUBMITTED_COLUMNS)
print(
f"place_orders ({mode}): {len(submitted)} submitted, "
f"{skipped_dupe} skipped (already in orders)."
)
submitted
Reconcile fills¶
kind python
# @name Reconcile fills
# Pulls the authoritative state from Alpaca for every order we've
# submitted in the last 24h, records fills to ``trades``, rebuilds
# ``positions``, and books the slippage delta (realized - expected)
# to the cost ledger.
#
# Depends on ``submitted`` to force DAG ordering after ``place_orders``
# — even when there are zero new submissions we still want to
# reconcile any open orders from prior runs.
import datetime as dt
import uuid
from alpaca.trading.requests import GetOrdersRequest
# Reference submitted so the DAG runs us after place_orders. The
# variable itself is unused — reconcile pulls authoritative state
# from Alpaca regardless of what we submitted this run.
_upstream_gate = submitted # noqa: F841
conn = open_db()
client = alpaca_trading_client()
# Pull every order we've submitted recently. Alpaca returns status in
# one canonical string per order; fill_price / fill_qty land on the
# order object when filled.
request = GetOrdersRequest(
status="all",
after=(dt.datetime.now(dt.timezone.utc) - dt.timedelta(days=3)),
limit=100,
)
alpaca_orders = client.get_orders(filter=request) or []
recorded_fills = 0
for ao in alpaca_orders:
order_id = str(getattr(ao, "id", ""))
if not order_id:
continue
status = str(getattr(ao, "status", ""))
filled_qty = float(getattr(ao, "filled_qty", 0) or 0)
filled_price = getattr(ao, "filled_avg_price", None)
filled_at = getattr(ao, "filled_at", None)
ticker = str(getattr(ao, "symbol", ""))
side = str(getattr(ao, "side", "")).lower()
# Update the orders table's status in case the submit happened in
# a previous run (e.g. DAY order filled overnight).
conn.execute(
"UPDATE orders SET status = ? WHERE order_id = ?",
[status, order_id],
)
if status not in ("filled", "partially_filled"):
continue
if filled_qty <= 0 or filled_price is None or filled_at is None:
continue
# Idempotent: one trade row per (order_id, fill_ts).
existing = conn.execute(
"SELECT 1 FROM trades WHERE order_id = ? AND fill_ts = ?",
[order_id, filled_at],
).fetchone()
if existing:
continue
fill_price = float(filled_price)
realized = estimate_trade_cost(filled_qty, side, fill_price)
trade_id = uuid.uuid4().hex
conn.execute(
"""
INSERT INTO trades
(trade_id, order_id, ticker, side, qty, fill_price, fill_ts, realized_cost_usd)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
[
trade_id,
order_id,
ticker,
side,
filled_qty,
fill_price,
filled_at,
realized["total_usd"],
],
)
# Slippage delta vs the estimate booked by place_orders.
expected_row = conn.execute(
"SELECT expected_cost_usd FROM orders WHERE order_id = ?", [order_id]
).fetchone()
expected = float(expected_row[0]) if expected_row and expected_row[0] else 0.0
delta = realized["total_usd"] - expected
if abs(delta) > 0.001:
record_cost(
conn,
source="trade_slippage_delta",
usd=delta,
detail={
"ticker": ticker,
"side": side,
"qty": filled_qty,
"fill_price": fill_price,
"expected_cost_usd": expected,
"realized_cost_usd": realized["total_usd"],
},
order_id=order_id,
)
recorded_fills += 1
# Rebuild ``positions`` from scratch using signed qty.
conn.execute("DELETE FROM positions")
conn.execute(
"""
INSERT INTO positions (ticker, qty, avg_cost, last_update)
SELECT
ticker,
SUM(CASE WHEN side = 'buy' THEN qty ELSE -qty END) AS qty,
CASE
WHEN SUM(CASE WHEN side = 'buy' THEN qty ELSE 0 END) = 0 THEN 0
ELSE SUM(CASE WHEN side = 'buy' THEN qty * fill_price ELSE 0 END)
/ SUM(CASE WHEN side = 'buy' THEN qty ELSE 0 END)
END AS avg_cost,
CURRENT_TIMESTAMP
FROM trades
GROUP BY ticker
HAVING SUM(CASE WHEN side = 'buy' THEN qty ELSE -qty END) != 0
"""
)
positions_snapshot = conn.execute(
"SELECT ticker, qty, avg_cost FROM positions ORDER BY ticker"
).fetchdf()
conn.close()
print(
f"reconcile: recorded {recorded_fills} new fills; "
f"{len(positions_snapshot)} open positions."
)
positions_snapshot
Cost ledger¶
kind python
# @name Cost ledger
# Aggregates the ``costs`` table so the next cell can subtract the
# right number from P&L. Also produces a human-readable per-source
# breakdown and a running daily total.
#
# Depends on ``positions_snapshot`` so the DAG orders us after
# reconcile — otherwise costs posted during reconciliation
# (slippage deltas) would miss this aggregation.
_upstream_gate = positions_snapshot # noqa: F841
conn = open_db()
cost_by_source = conn.execute(
"""
SELECT source, ROUND(SUM(usd), 4) AS usd, COUNT(*) AS calls
FROM costs
GROUP BY source
ORDER BY usd DESC
"""
).fetchdf()
cost_by_day = conn.execute(
"""
SELECT CAST(ts AS DATE) AS day, ROUND(SUM(usd), 4) AS usd
FROM costs
GROUP BY day
ORDER BY day DESC
LIMIT 14
"""
).fetchdf()
total_cost_usd = float(
conn.execute("SELECT COALESCE(SUM(usd), 0) FROM costs").fetchone()[0]
)
conn.close()
print("=== Cost breakdown by source ===")
print(cost_by_source.to_string(index=False))
print()
print("=== Daily spend (last 14d) ===")
print(cost_by_day.to_string(index=False))
print()
print(f"Total spend to date: ${total_cost_usd:.4f}")
cost_summary = {
"total_usd": round(total_cost_usd, 4),
"by_source": cost_by_source.to_dict(orient="records"),
"by_day": cost_by_day.to_dict(orient="records"),
}
cost_summary
P&L report¶
kind python
# @name P&L report
# The money number: realized P&L minus everything in ``costs``.
# Re-running the notebook over time builds up history here.
#
# References cost_summary + positions_snapshot to lock DAG order
# after costs + reconcile.
_upstream_gate_costs = cost_summary # noqa: F841
_upstream_gate_positions = positions_snapshot # noqa: F841
import pandas as pd
conn = open_db()
# Realized P&L is SUM(sell proceeds) - SUM(buy costs) per ticker —
# only valid for tickers where quantity has round-tripped back to
# zero. Mark unrealized P&L separately against the latest close.
realized_by_ticker = conn.execute(
"""
SELECT
ticker,
SUM(CASE WHEN side = 'sell' THEN qty * fill_price ELSE 0 END)
- SUM(CASE WHEN side = 'buy' THEN qty * fill_price ELSE 0 END) AS realized_pnl_usd,
SUM(qty) AS turnover
FROM trades
GROUP BY ticker
ORDER BY realized_pnl_usd DESC
"""
).fetchdf()
unrealized_by_ticker = conn.execute(
"""
WITH latest AS (
SELECT ticker, close
FROM prices
WHERE (ticker, ts) IN (
SELECT ticker, MAX(ts) FROM prices GROUP BY ticker
)
)
SELECT p.ticker,
p.qty,
p.avg_cost,
l.close AS mark_price,
ROUND(p.qty * (l.close - p.avg_cost), 2) AS unrealized_pnl_usd
FROM positions p
LEFT JOIN latest l USING (ticker)
WHERE p.qty != 0
ORDER BY p.ticker
"""
).fetchdf()
total_realized = float(realized_by_ticker["realized_pnl_usd"].sum()) if len(realized_by_ticker) else 0.0
total_unrealized = (
float(unrealized_by_ticker["unrealized_pnl_usd"].sum()) if len(unrealized_by_ticker) else 0.0
)
total_costs = float(conn.execute("SELECT COALESCE(SUM(usd), 0) FROM costs").fetchone()[0])
# The headline metric — does the strategy pay for itself?
cost_adjusted_pnl = total_realized + total_unrealized - total_costs
conn.close()
print("=== Realized P&L by ticker ===")
print(realized_by_ticker.to_string(index=False) if len(realized_by_ticker) else "(no closed trades yet)")
print()
print("=== Unrealized P&L (mark-to-last-close) ===")
print(
unrealized_by_ticker.to_string(index=False) if len(unrealized_by_ticker) else "(no open positions)"
)
print()
print("=== Headline ===")
print(f" Realized P&L: ${total_realized:,.2f}")
print(f" Unrealized P&L: ${total_unrealized:,.2f}")
print(f" Total costs: ${total_costs:,.4f}")
print(f" Cost-adjusted P&L: ${cost_adjusted_pnl:,.2f}")
pnl_summary = {
"realized_usd": round(total_realized, 2),
"unrealized_usd": round(total_unrealized, 2),
"costs_usd": round(total_costs, 4),
"cost_adjusted_pnl_usd": round(cost_adjusted_pnl, 2),
"by_ticker_realized": realized_by_ticker.to_dict(orient="records"),
"by_ticker_unrealized": unrealized_by_ticker.to_dict(orient="records"),
}
pnl_summary
daily_note¶
kind prompt
Prompt cell — response intentionally excluded from export.
# @name daily_note
# @temperature 0.2
# @max_tokens 800
# @system You are a trading journal assistant. Your job is to produce a concise, honest end-of-day reflection on a small paper-trading strategy. Do not sugarcoat — if the strategy is losing money or being eaten by costs, say so directly.
# @output_schema {"type": "object", "properties": {"one_liner": {"type": "string"}, "biggest_winner": {"type": "string"}, "biggest_loser": {"type": "string"}, "cost_health": {"type": "string", "enum": ["green", "yellow", "red"]}, "cost_comment": {"type": "string"}, "lessons": {"type": "array", "items": {"type": "string"}, "minItems": 1, "maxItems": 5}, "tomorrow_watch": {"type": "array", "items": {"type": "string"}}}, "required": ["one_liner", "biggest_winner", "biggest_loser", "cost_health", "cost_comment", "lessons", "tomorrow_watch"]}
# @validate_retries 2
#
# Last cell in the notebook. Produces a structured end-of-day journal
# entry from the P&L summary and cost ledger — the kind of thing
# you'd want to read first the next morning.
Trading day recap. Produce an honest, structured summary.
## P&L
{{ pnl_summary }}
## Costs
{{ cost_summary }}
## Today's orders
{{ submitted }}
## Rules for the journal
- `one_liner` — one sentence summarizing the day. Lead with the
cost-adjusted P&L sign.
- `biggest_winner` / `biggest_loser` — name the ticker and amount;
use "none" if no closed trades yet.
- `cost_health`: "green" when costs are < 10% of gross P&L,
"yellow" at 10–40%, "red" at >40% or when P&L is negative.
- `cost_comment`: one sentence explaining the color.
- `lessons` — 1-5 short imperatives ("cut losers faster", "avoid
low-confidence signals", etc.). If the sample size is too small to
conclude anything, say so in one of the lessons.
- `tomorrow_watch` — specific things to look at tomorrow. Can be
empty if there's nothing actionable yet.
Be terse. No filler.