Monday, November 8, 2010

Student's t-test again 6

Code

This is the sixth in a series of posts about Student's t test. The previous posts are here, here, here, here, and here.

In this post I'm going to talk about how I would test my home-grown functions to do the t-test in Python. I'm only going to do this for the one-sample test, because I'm running out of energy. But you could easily extend this to the other functions.

We could check using SciPy if it is installed, and that would be easy. But I'm going to use R, partly because it's the gold standard for statistics, and partly because it's a useful technique that I've used in my real work. While we could use RPy as I mentioned the other day, here I am going to use R to generate the sample data, run the test of interest and then write the results to disk.

What we're going to do is run R in batch mode. If it gives an error, which frequently happens when building such a test, we want to show the error. We also clean up extra files when we're done. The code to do this has been added to utils.py and is given below. It should be self-explanatory.

When we run R this way, we feed it a text file with the commands to run. This file is constructed by the function write_R_code. It's not pretty, but I hope you get the idea. Perhaps a better method, which I've also used, is to write the code to a text file, then load it into Python, substitute the file names and specific functions etc., then write that to disk as the file to give to R.

In the last step, we load the results that R has given us and evaluate how well our functions do on the same data. As you can see, we match very well as a rule, but have rounding errors at the extremes of the t-distribution. I believe this could be fixed by extending the range over which we built the distribution (see here). I think it looks pretty good.

[UPDATE: Just to make it clear what we did: we do 1000 runs and look at the details for the first five, then test the rest for deviation from the R results by more than ε = 0.0001. ]

Output:

run
(1626, 0)
clean
.RCode removed
.RCode.Rout removed
.RData removed
.RHistory not found

test results:
N = 1000
R gave: t = -1.858651
I get: t = -1.85865011386

R gave: t = -4.250644
I get: t = -4.25063601226

R gave: t = -1.743078
I get: t = -1.74307933578

R gave: t = 0.1286087
I get: t = 0.128612751636

R gave: t = -9.36424
I get: t = -9.36429217504

error with:
[ 8.946167 8.941678 8.940817] -1843.026 -1843.23012185

error with:
[ 8.895525 8.686258 8.761813] -52.60937 -52.6095381366

error with:
[ 10.54477 10.42989 10.66126] -21.77912 -21.7796812829

error with:
[ 11.05575 10.89208 10.68675] -10.50883 -10.5090054298

error with:
[ 10.02618 10.07471 10.16116] -48.45138 -48.4527009573


R_test.py

import numpy as np
from t_test import one_sample_t
import utils

def write_R_code(D):
L = [ 'fn <- file("' + D['results_fn'] + '","w")',
'n = ' + str(D['n']),
'mu = ' + str(D['mu']),
'sigma = ' + str(D['sigma']),
'for (i in 1:' + str(D['N']) + ') {',
'A = rnorm(n,mu,sigma)',
'result = ' + D['func'] + str(D['mu_est']) + ')',
'cat(A,file = fn,sep = "\n")',
'cat(result$statistic,file = fn,sep="\n")',
'cat("\n",file = fn)',
'}',
'close(fn)'
]
FH = open(D['code_fn'], 'w')
FH.write('\n'.join(L))
FH.close()

def compare_to_my_result(L):
print 'test results:'
print 'N = ', len(L)
for i,e in enumerate(L):
A = np.array(e[:-1])
result = one_sample_t(A,mu=12)
if i < 5:
print 'R gave: t =', e[-1]
print 'I get: t =', result[0]
print
else:
if e[-1] - result[0] > 0.0001:
print 'error with:'
print A, e[-1], result[0]
print

if __name__ == '__main__':
directory = '/Users/telliott_admin/Desktop/'
D = { 'code_fn':directory + '.RCode',
'results_fn':directory + 'results.txt',
'func':'t.test(A,alternative="less",mu=',
'n':3,'mu':10,'sigma':2,'mu_est':12,'N':1000 }

write_R_code(D)
utils.runR(D['code_fn'],v=True)
utils.cleanup_R_files(D['code_fn'],directory,v=True)
L = utils.load_data(D['results_fn'])
compare_to_my_result(L)


utils.py

from __future__ import division
import os,subprocess
import numpy as np

def unbiased_var(X):
n = len(X)
sample_SS = sum(X**2) - sum(X)**2 / n
return sample_SS/ (n-1)

def unbiased_std(X):
return np.sqrt(unbiased_var(X))

def runR(code_fn,v=False):
if v: print 'run'
cmd = 'R CMD BATCH ' + code_fn
p = subprocess.Popen(cmd, shell=True)
sts = os.waitpid(p.pid, 0)
if v: print sts
if sts[1] != 0: show_R_error()

def show_R_error():
fn = directory + '.Rcode.Rout'
FH = open(fn, 'r')
msg = FH.read()
FH.close()
print '\n'.join(msg.split('\n')[-3:]).strip()

def cleanup_R_files(code_fn,directory,v=False):
if v: print 'clean'
L = ['.RCode.Rout','.RData', '.RHistory']
L = [code_fn] + [directory + e for e in L]
for fn in L:
try:
os.remove(fn)
if v: print fn.split('/')[-1], 'removed'
except OSError:
if v: print fn.split('/')[-1], 'not found'
print

def load_data(results_fn):
def convert(s):
L = s.split('\n')
return [float(n) for n in L]
FH = open(results_fn, 'r')
data = FH.read().strip()
FH.close()
L = data.split('\n\n')
rL = [convert(s) for s in L]
return rL

No comments: