LLM-Assisted Futures Trading: Using AI for Signal Generation on NinjaTrader 8
Feed live futures data to an LLM and let it generate buy/sell signals executed on NinjaTrader 8 through the CrossTrade API. Complete Python implementation.
Large language models are surprisingly good at reasoning about structured numerical data. Give one a window of recent prices, a volatility reading, your current position, and a set of trading rules, and it can produce a coherent buy/sell/hold signal with a written rationale for the decision. It won't beat a well-tuned quantitative model, but it can serve as an intelligent filter or signal layer that would take weeks to build with traditional rule-based logic.
This tutorial builds a Python system that streams live futures quotes from NinjaTrader 8 through the CrossTrade WebSocket API, packages that data into structured snapshots, sends each snapshot to Claude for analysis, and executes the model's trading signals back through NinjaTrader. The LLM acts as a signal generator — the Python script handles all state management, risk enforcement, and execution.
To be clear about what this is and isn't: the LLM is not an autonomous agent. It doesn't decide when to check prices, which API calls to make, or how to manage order lifecycle. It receives a snapshot, returns a signal, and immediately forgets everything. Your Python script is the brain. The LLM is a consultant you call every 30 seconds.
That said, the practical value is real. You get a reasoning layer that can synthesize price action, volatility, position state, and P&L into a trading decision with a human-readable explanation — and you can tune its behavior by editing a system prompt instead of rewriting indicator logic.
Prerequisites
This tutorial assumes you've already got the basic CrossTrade API setup working: NinjaTrader 8 running with the add-on connected, your Bearer token from the My Account page, and Python 3.10+. If you haven't done that yet, start with How to Build a Python Trading Bot for NinjaTrader 8 — it covers the foundations.
You'll also need an API key from an LLM provider. This tutorial uses Anthropic's Claude API, but the pattern works with any provider that accepts structured prompts and returns JSON.
pip install websockets anthropic
How It Works
The system runs a simple loop. Every N seconds, the Python script builds a market snapshot from accumulated WebSocket data, sends it to the LLM with a system prompt that defines the trading rules, parses the structured response, and executes the signal if it calls for action. Between evaluations, the script silently collects price ticks and P&L updates.
The LLM is stateless. Every call includes the full context it needs. There's no conversation history, no memory between evaluations. This is a deliberate design choice: it makes every signal independently auditable. You can open the log, look at any single snapshot/response pair, and understand exactly what data the model saw and why it made its decision.
Configuration
Keep all tunable parameters in one place:
import os
# CrossTrade
CT_TOKEN = os.environ.get("CROSSTRADE_TOKEN", "your-secret-key")
CT_WS_URL = "wss://app.crosstrade.io/ws/stream"
ACCOUNT = "Sim101"
INSTRUMENT = "ES 09-26"
# LLM
ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY", "your-anthropic-key")
MODEL = "claude-sonnet-4-20250514"
# Trading rules
CHECK_INTERVAL = 30 # Seconds between LLM evaluations
MAX_POSITION_SIZE = 1 # Max contracts
MAX_DAILY_LOSS = -500.0 # Dollar loss limit — stop trading
PRICE_WINDOW = 20 # Number of prices to include in context
Keep MAX_POSITION_SIZE at 1 and ACCOUNT on Sim101 while testing. These are your guardrails.
The Market State Collector
The script accumulates market data over time and packages it into clean snapshots for the LLM:
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class MarketState:
prices: list = field(default_factory=list)
position: dict = None
session_pnl: float = 0.0
last_update: datetime = None
def add_price(self, price: float):
self.prices.append(price)
if len(self.prices) > 100:
self.prices.pop(0)
self.last_update = datetime.now()
def recent_prices(self, n: int = 20) -> list:
return self.prices[-n:] if len(self.prices) >= n else self.prices
def price_change(self) -> float:
recent = self.recent_prices()
if len(recent) < 2:
return 0.0
return recent[-1] - recent[0]
def volatility(self) -> float:
recent = self.recent_prices()
if len(recent) < 5:
return 0.0
diffs = [abs(recent[i] - recent[i-1]) for i in range(1, len(recent))]
return sum(diffs) / len(diffs)
def snapshot(self) -> str:
recent = self.recent_prices()
if not recent:
return "No market data available yet."
lines = []
lines.append(f"Instrument: {INSTRUMENT}")
lines.append(f"Current price: {recent[-1]:.2f}")
lines.append(f"Prices (last {len(recent)} updates, ~1 second apart): "
f"{', '.join(f'{p:.2f}' for p in recent)}")
lines.append(f"Price change over window: {self.price_change():+.2f} points")
lines.append(f"Average tick-to-tick movement: {self.volatility():.2f} points")
lines.append(f"Session P&L: ${self.session_pnl:+.2f}")
if self.position:
lines.append(f"Current position: {self.position['direction']} "
f"{self.position['quantity']} @ {self.position['entry']:.2f}")
lines.append(f"Unrealized P&L: ${self.position.get('unrealized', 0.0):+.2f}")
else:
lines.append("Current position: FLAT")
return "\n".join(lines)
The snapshot() method produces everything the LLM needs in a single string: current price, recent price history with explicit timeframe context, trend, volatility, position, and P&L.
The Signal Generator
This is where the LLM does its work. The system prompt defines the trading personality, risk constraints, and expected output format. The critical design choice is constraining the response to strict JSON — no free-form analysis, no hedging, just a signal and a one-sentence reason.
import anthropic
import json
client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)
SYSTEM_PROMPT = """You are a futures trading signal generator for ES (E-mini S&P 500).
You receive a market snapshot every 30 seconds containing recent prices, volatility, position state, and session P&L. Your job is to output a trading signal.
RULES:
- Valid signals: BUY (enter or add long), SELL (enter or add short), HOLD (no action).
- Maximum position: 1 contract long or 1 contract short.
- If already LONG, SELL means exit. If already SHORT, BUY means exit.
- Never signal a reversal. To go from LONG to SHORT, signal SELL first, then BUY on a future evaluation.
- If daily P&L has hit the loss limit, always signal HOLD.
GUIDELINES:
- Look for short-term momentum in the price window.
- High volatility = wider risk, be more cautious.
- Protect profits: if unrealized P&L is positive and momentum is reversing, consider exiting.
- Cut losses: if unrealized P&L is significantly negative and worsening, exit.
- When uncertain, HOLD. Missing a trade costs less than forcing a bad one.
RESPOND WITH ONLY THIS JSON — no other text:
{
"signal": "BUY" | "SELL" | "HOLD",
"reason": "One sentence explaining your reasoning"
}"""
def get_signal(market_snapshot: str) -> dict:
try:
response = client.messages.create(
model=MODEL,
max_tokens=200,
system=SYSTEM_PROMPT,
messages=[
{"role": "user", "content": f"Market snapshot:\n\n{market_snapshot}"}
]
)
text = response.content[0].text.strip()
text = text.replace("```json", "").replace("```", "").strip()
decision = json.loads(text)
if decision.get("signal") not in ("BUY", "SELL", "HOLD"):
return {"signal": "HOLD", "reason": "Invalid LLM response — defaulting to HOLD"}
return decision
except Exception as e:
print(f"Signal generation error: {e}")
return {"signal": "HOLD", "reason": f"Error: {e}"}
The fallback behavior is important: any parsing failure, API timeout, or malformed response produces a HOLD signal. When the signal layer breaks, the safest action is no action.
The Execution Layer
Translates signals into CrossTrade WebSocket RPC calls, with safety cross-checks against current position state:
async def execute_signal(ws, signal: str, state: MarketState):
if signal == "HOLD":
return
if signal == "SELL" and state.position and state.position["direction"] == "Long":
msg = {
"action": "rpc",
"id": "exit-long",
"api": "ClosePosition",
"args": {"account": ACCOUNT, "instrument": INSTRUMENT}
}
await ws.send(json.dumps(msg))
print(f" → Closing LONG position")
elif signal == "BUY" and state.position and state.position["direction"] == "Short":
msg = {
"action": "rpc",
"id": "exit-short",
"api": "ClosePosition",
"args": {"account": ACCOUNT, "instrument": INSTRUMENT}
}
await ws.send(json.dumps(msg))
print(f" → Closing SHORT position")
elif signal == "BUY" and not state.position:
msg = {
"action": "rpc",
"id": "enter-long",
"api": "PlaceOrder",
"args": {
"account": ACCOUNT,
"instrument": INSTRUMENT,
"action": "Buy",
"orderType": "Market",
"quantity": MAX_POSITION_SIZE,
"timeInForce": "Gtc"
}
}
await ws.send(json.dumps(msg))
print(f" → Entering LONG {MAX_POSITION_SIZE} contract(s)")
elif signal == "SELL" and not state.position:
msg = {
"action": "rpc",
"id": "enter-short",
"api": "PlaceOrder",
"args": {
"account": ACCOUNT,
"instrument": INSTRUMENT,
"action": "Sell",
"orderType": "Market",
"quantity": MAX_POSITION_SIZE,
"timeInForce": "Gtc"
}
}
await ws.send(json.dumps(msg))
print(f" → Entering SHORT {MAX_POSITION_SIZE} contract(s)")
else:
print(f" → Signal {signal} doesn't apply to current state — skipping")
The cross-check logic prevents nonsensical orders. If the LLM says BUY but you're already long, nothing happens. This layer exists because the LLM occasionally misreads position state despite it being stated explicitly in the snapshot. Trust but verify.
The Main Loop
Everything connects in a single async event loop:
import asyncio
import websockets
async def run():
state = MarketState()
last_eval = 0
headers = {"Authorization": f"Bearer {CT_TOKEN}"}
print("=" * 60)
print(f"LLM Signal Generator starting")
print(f"Instrument: {INSTRUMENT} | Account: {ACCOUNT}")
print(f"Evaluation interval: {CHECK_INTERVAL}s")
print(f"Max daily loss: ${MAX_DAILY_LOSS}")
print(f"Model: {MODEL}")
print("=" * 60)
async with websockets.connect(CT_WS_URL, extra_headers=headers) as ws:
await ws.send(json.dumps({
"action": "subscribe",
"instruments": [INSTRUMENT]
}))
await ws.send(json.dumps({
"action": "streamPnl",
"enabled": True
}))
print(f"Connected. Collecting market data...\n")
async for message in ws:
data = json.loads(message)
now = asyncio.get_event_loop().time()
# Collect market data
if data.get("type") == "marketData":
for quote in data["quotes"]:
if quote["instrument"] == INSTRUMENT:
state.add_price(quote["last"])
# Collect P&L
elif data.get("type") == "pnlUpdate":
for acct in data.get("accounts", []):
if acct["name"] == ACCOUNT:
pnl = acct.get("pnl", {})
state.session_pnl = pnl.get("totalNet", 0.0)
# Handle RPC responses (order fills, position checks)
elif "id" in data:
result = data.get("data", data)
print(f" RPC [{data['id']}]: {result}")
# Update position state from position queries
if data["id"] in ("check-position", "pre-eval-position"):
pos_data = data.get("data", {})
if (pos_data and pos_data.get("marketPosition")
and pos_data["marketPosition"] != "Flat"):
state.position = {
"direction": pos_data["marketPosition"],
"quantity": pos_data.get("quantity", 0),
"entry": pos_data.get("averagePrice", 0.0),
"unrealized": pos_data.get("unrealizedPnl", 0.0)
}
else:
state.position = None
# Evaluate on interval
if now - last_eval >= CHECK_INTERVAL and len(state.prices) >= PRICE_WINDOW:
last_eval = now
# Enforce daily loss limit
if state.session_pnl <= MAX_DAILY_LOSS:
print(f"\nDaily loss limit reached (${state.session_pnl:.2f}). Paused.")
continue
# Refresh position state
await ws.send(json.dumps({
"action": "rpc",
"id": "pre-eval-position",
"api": "GetPosition",
"args": {"account": ACCOUNT, "instrument": INSTRUMENT}
}))
await asyncio.sleep(0.3)
# Build snapshot, get signal, execute
snapshot = state.snapshot()
print(f"\n--- Evaluation at {datetime.now().strftime('%H:%M:%S')} ---")
print(snapshot)
signal = get_signal(snapshot)
print(f"\nSignal: {signal['signal']} — {signal['reason']}")
await execute_signal(ws, signal["signal"], state)
# Log for post-session review
with open("signal_log.jsonl", "a") as f:
f.write(json.dumps({
"time": datetime.now().isoformat(),
"snapshot": snapshot,
"signal": signal,
"session_pnl": state.session_pnl
}) + "\n")
if __name__ == "__main__":
asyncio.run(run())
Every evaluation prints the full snapshot and signal with reasoning. Every decision is logged to signal_log.jsonl for post-session review.
Tuning the System Prompt
The system prompt controls everything about how the LLM interprets market data. Small wording changes produce meaningfully different behavior. A few directions worth experimenting with:
Pre-compute indicators in Python. You can ask the LLM to calculate RSI from raw prices, but it's more reliable to calculate it in your snapshot() method and include the result as a field. The model is better at interpreting indicators than computing them. Add RSI, VWAP, or a Bollinger Band reading to the snapshot and reference them in the prompt guidelines.
Include recent signals. Append the last 3-5 signals to the snapshot so the model has context about its own recent output. This reduces flip-flopping where the model enters and exits on consecutive evaluations. Add a recent_signals list to the MarketState class and include it in the snapshot string.
Add regime context. A line like "Current session volatility is 1.5x the 20-day average" or "Market opened gap-up and has been trending" gives the model macro context it can't derive from 20 ticks of data.
Vary the evaluation interval. 30 seconds is frequent for a position trader, slow for a scalper. Match the interval to the strategy timeframe. For a 5-minute bar strategy, 300 seconds between evaluations is more appropriate. For a system that reacts to momentum bursts, 10-15 seconds (with rate limits in mind) makes more sense.
From Signal Generator to Full Agent
This architecture is deliberately simple: one data source, one LLM call, one signal, one execution path. If you want the LLM to go beyond signal generation — autonomously deciding which API calls to make, when to check positions, when to pull historical data, how to manage order lifecycle — that's a fundamentally different architecture using tool-use capabilities. We cover that in How to Build an AI Trading Agent for NinjaTrader with the CrossTrade API.
Running in Production
When you're ready to move beyond sim, the transition is straightforward. Change ACCOUNT to your live account name. Deploy to a CrossTrade VPS so the script runs uninterrupted during market hours. Set server-side risk limits using CrossTrade's Account Manager monitors as a backstop independent of your script's own limits. And review signal_log.jsonl after every session — the log is your audit trail for understanding what the model saw and why it acted.
The full API reference is at docs.crosstrade.io. Questions or issues? Hit us up on Discord.
New to CrossTrade? Start your free 7-day trial →
