The model for returns and auxiliary series of predictors is not necessarily correctly specified with a multivariate normal sequence but the important thing for us is to be able to incorporate auxiliary predictor sequences directly in a HMM model and then we can use the conditioned distributions to predict direction and large moves. I have obtained good results on prediction using machine learning algorithms such as SVM. Conceptually, the HMM framework is appealing for many reasons including a natural explanation of leptokurtosis of return distributions as well as the idea that the hidden states represent unobserved economic and other factors. Here is the code.

import argparse
import talib
import pandas as pd
from equity_utils import *
import numpy as np
from scipy.stats import nanmean, nanstd
from sklearn import preprocessing
from pandas.io.data import DataReader
from equity_utils import *
import datetime
import sys
import scipy
import ghmm
symbols = []
f = open( 'symbols-CBOE-opt.txt','r')
for line in f:
symbols.append(line.rstrip())
f.close()
symbols = subselect_stocks( symbols, datetime.date(1999,1,1))
parser = argparse.ArgumentParser(description='Trading strategy using language triple word probabilities')
parser.add_argument('--ticker')
args=parser.parse_args()
def print_conditionals( T, S, t=0, label='RSI'):
print '%s (%f,%f) (%f,%f)' % (label,nanmean(T[S>t]),nanstd(T[S>t]),
nanmean(T[S<-t]),nanstd(T[S<-t]))
def positive_probability_HMM( matrices, last_state, positionOfReturns):
A = matrices[0]
B = matrices[1]
pi = matrices[2]
# straightforward expectaction over possible states
# and possible emissions
A_row = A[last_state]
B_row = B[last_state]
p = 0.0
(muv, sigmav) = B_row
n_features = len(muv)
# Using marginals
#
sigmaM = np.array(sigmav).reshape(n_features,n_features)
mu0 = muv[positionOfReturns]
sigma0 = sigmaM[positionOfReturns,positionOfReturns]
# Using conditionals
#
mu2 = np.array(muv)[:(n_features-1)]
sigma22 = sigmaM[:(n_features-1),:(n_features-1)]
sigma12 = sigmaM[(n_features-1),:(n_features-1)]
sigma21 = sigmaM[:(n_features-1),(n_features-1)]
sigma = sigma0 - np.dot(sigma12,np.dot(np.linalg.inv(sigma22),sigma21))
mu = mu0 + np.dot(sigma12,np.dot(np.linalg.inv(sigma22),mu2))
for i in range(len(A_row)):
# check to see if symbol j corresponds to
p += pi[i]*A_row[i]*(1.0 - scipy.stats.norm.cdf(0.0, loc=mu, scale=sigma))
return p
def score_hmm( hmm,X,y ):
sigma = ghmm.Float()
S = ghmm.EmissionSequence(sigma, X.flatten().tolist())
viterbi_path, _ = hmm.viterbi(S)
matrices = hmm.asMatrices()
hits=0
i = 0
positionOfReturns = len(X[0]) - 1
for state in viterbi_path:
p = positive_probability_HMM( matrices, state, positionOfReturns )
pos = np.sign( p - 0.5 )
if pos == y[i]:
hits += 1
i += 1
N=i
if N == 0:
return 0.0
return float(hits)/float(N)
def hmm_test_performance( symbol ):
fname = 'eod/'+symbol+'.csv'
data = pd.read_csv( fname, skiprows=1,names=['Date','Open','High','Low','Close','Volume'],na_values=['-'])
data.set_index('Date')
rets = np.log(data['Close']/data['Close'].shift(1))
open = fill_nan(data['Open'])
high = fill_nan(data['High'])
low = fill_nan(data['Low'])
close = fill_nan(data['Close'])
volume = fill_nan(data['Volume']).astype(float)
# Let's add in reference market returns, SPY
start = datetime.datetime.strptime(data['Date'].values[0],'%Y-%m-%d')
finish = datetime.datetime.strptime(data['Date'].values[-1],'%Y-%m-%d')
spydata = DataReader('SPY', 'google', start, finish)
spyrets = np.log(spydata['Close']/spydata['Close'].shift(1)).values
# I want to analyze the conditional distributions of returns
# on technical analysis indicators
t1 = talib.RSI( np.array(close.values))
t2,_,_ = talib.MACD(np.array(close.values))
t3 = talib.ADX(np.array(high.values),np.array(low.values),
np.array(close.values))
t4 = talib.OBV(np.array(close.values),np.array(volume.values))
t5 = talib.MFI(np.array(high.values),np.array(low.values),
np.array(close.values),np.array(volume.values))
X = []
y = []
N = min( len(spyrets),len(t1),len(t2),len(t3),len(t4),len(t5))
for i in range(1,N):
x = [t1[i],t2[i],t3[i],t4[i],t5[i],rets[i-1],spyrets[i-1]]
if not np.isnan(np.sum(x)):
X.append( x )
y.append(rets[i])
train_rows = 1000
X_scaled = preprocessing.scale(np.array(X))
Xtrain = X_scaled[:1000,:]
Xtest = X_scaled[1000:,:]
y_train = y[:1000]
y_test = y[1000:]
#from sklearn.svm import SVC
#clf = SVC(kernel='rbf',gamma=0.01)
sigma = ghmm.Float()
pi = [1./5]*5
A = [ [1./5]*5]*5
n_feature = Xtrain.shape[1]
mu0 = [ 0.0 ]*n_feature
sigma0 = np.eye(n_feature).flatten().tolist()
B = [[mu0,sigma0,[1.0]]]*5
S = ghmm.EmissionSequence(sigma,Xtrain.flatten().tolist())
hmm = ghmm.HMMFromMatrices( sigma, ghmm.MultivariateGaussianDistribution(sigma),A,B,pi)
hmm.baumWelch(S)
#clf.fit(Xtrain,np.sign(y_train))
accuracy_train = score_hmm(hmm, Xtrain,np.sign(y_train))
accuracy_test = score_hmm(hmm, Xtest,np.sign(y_test))
return (np.round(accuracy_train,3),
np.round(accuracy_test,3))
output = open('results.csv','w')
for s in symbols:
sys.stderr.write(s)
sys.stderr.write('\n')
(tr,te) = hmm_test_performance(s)
output.write('%s,%f,%f\n'%(s,tr,te))
output.close()

Read Full Post »