Part 2: Simulations¶
To start this notebook, we recall the setting we are interested in. We want to understand the impact of noise (for instance, observation error, or any form of statistical uncertainty) on the performance of a particular kind of RNN (echo state networks, and more generally reservoir computers). Because there is noise, we need to sample the reservoir many times to get the correct estimate of the weights. This in principle can also be trivially parallelized over, but we will instead choose different copies of reservoirs to parallelize over because we expect the number of reservoirs needed to be larger than the number of samples needed, and because multiple samples is a special case of the different reservoirs, where the different reservoirs are identical.
With these points in mind, we turn to actually generating some results! We implement an efficient GPU simulator for multiple stacked reservoirs. After averaging over an ensemble, we only have a few free parameters. To start, we note that the problem is extremely overparameterized, and so ridge regression is necessary. We can sweep this parameter, which leaves the reservoir size and the amount of noise. For each point on this two-d grid, we get an NRMSE value.
import torch
import numpy as np
import os
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
from itertools import combinations
from functools import reduce
from time import time
import pickle
def fetchData(n):
history = 10
while True:
u = 0.5 * np.random.uniform(size=(n+history))
y = np.zeros(shape=(n+history))
for i in range(history, n+history):
y[i] = 0.3 * y[i-1] + 0.05 * y[i-1] * np.sum(y[i-history:i]) + 1.5 * u[i-1] * u[i-history] + 0.1
import warnings
warnings.filterwarnings("ignore")
if np.isfinite(y).all():
return (u[history:], y[history:])
else:
continue
def ESN_init(inSize, outSize, resSize, alpha, sparsity):
Win = np.random.rand(inSize+1, resSize) - 0.5
W = np.random.rand(resSize, resSize) - 0.5
W[np.random.rand(resSize, resSize)>sparsity] = 0
spec_rad = max(abs(np.linalg.eig(W)[0]))
W /= spec_rad
return Win, W
def res(data, Ws, Wins, resSize, device):
inSize, outSize, alpha, sparsity = 1, 1, 0.7, 0.8
data_tensor = torch.tensor(data, dtype=torch.float32).unsqueeze(0)
data_copies = data_tensor.repeat(len(Ws), 1)
W_copies = torch.stack([torch.tensor(W, dtype=torch.float32, device=device) for W in Ws]).to(device)
Win_copies = torch.stack([torch.tensor(Win, dtype=torch.float32, device=device) for Win in Wins]).to(device)
R_copies = 0.1 * (torch.rand((len(Ws), resSize), device=device) - 0.5).to(device)
chunk_size = 1
i = 0
dm = torch.zeros((len(Ws), chunk_size, 1 + inSize + resSize))
chunk = data_copies[:, chunk_size*i:chunk_size*(i+1)]
rtn = []
while chunk.shape[1] != 0:
dm = dm.to(device)
chunk = chunk.to(device)
for t in range(chunk.size(1)):
u = chunk[:, t].unsqueeze(-1) # Current input
ones = torch.ones(u.shape[0], 1, device=device)
ones = torch.hstack((ones, u))
R_copies = (1 - alpha)*R_copies +\
alpha*torch.tanh(torch.einsum('ij,ijk->ik', ones,Win_copies) +\
torch.einsum('ij,ijk->ik', R_copies, W_copies))
# copy, time, signal
dm[:, t, :] = torch.cat((ones, R_copies), dim=1)
rtn.append(dm.cpu())
i+=1
dm = torch.zeros((len(Ws), chunk_size, 1 + inSize + resSize))
chunk = data_copies[:, chunk_size*i:chunk_size*(i+1)]
rtn = torch.cat(rtn, dim=1)[:,50:data_copies.size(1),:] # cut out the warmup
return rtn
The first new detail is that our simulations have noise, which modifies the least squares solution. We recall from the last notebook that the function form of the least squares solution without noise is
$$ \begin{align} W_{out} = (X^TX)^{-1}X^TY = X^+Y \end{align} $$where $+$ denotes the pseudo-inverse. Generalizing to the noisy setting the form becomes
$$ \begin{align} W_{out} = \langle \overline{XX^T}\rangle^+ \langle \overline{XY^T} \rangle, \end{align} $$where $\langle \cdot \rangle$ is an average over the noise, and $\overline{\cdot}$ is an average over the input randomness (we have been using the uniform measure). The derivation can be found in [1], and for this note we suffice to comment that the solution now depends on the covariances between the reservoir data $X$ and itself, and between the target functions $Y$ and the reservoir data. We compute this first, wrapped in a function so that we can parallelize over gpus. We additionally chunk all computations to prevent the GPU memory from running out.
[1] Polloreno, Anthony M., Reuben RW Wang, and Nikolas A. Tezak. "A Note on Noisy Reservoir Computation." arXiv preprint arXiv:2302.10862 (2023).
def gpu_job(device, res_size, noise, ridge):
# Utilities.
def compute_wout():
chunk_size = 1
j, b, t, i_k = dms.size()
num_chunks = (b + chunk_size - 1) // chunk_size
a_chunks = []
for chunk_idx in range(num_chunks):
start_idx = chunk_idx * chunk_size
end_idx = min(start_idx + chunk_size, b)
dms_chunk = dms[:, start_idx:end_idx, :, :].to(device)
a_chunk = torch.einsum('jbti,jbtk -> bitk', dms_chunk, dms_chunk)
a_chunk = torch.sum(a_chunk, axis=2) / numshots
a_chunks.append(a_chunk)
a = torch.cat(a_chunks, dim=0)
avg = sum(dms)/len(dms)
i_total, j, k = avg.size()
num_chunks = (i_total + chunk_size - 1) // chunk_size
result =[]
for chunk_idx in range(num_chunks):
start_idx = chunk_idx * chunk_size
end_idx = min((chunk_idx+1) * chunk_size, i_total)
avg_chunk = avg[start_idx:end_idx, :, :].to(device)
result_chunk = torch.einsum('ikj,k->ij', avg_chunk, Y_train[50:])
result.append(result_chunk)
b = torch.cat(result, dim=0)
ridge_mat = torch.eye(a.shape[1]).to(device)
Wout = torch.linalg.lstsq(a + ridge*ridge_mat, b, rcond=None)[0]
return Wout
def compute_powerset(data):
to_reduce = [data[:, :, idx] for idx in generate_powerset(range(data.shape[2]))[1:]]
ones = torch.ones(to_reduce[0][:, :, 0].shape)
rtn = []
for r in to_reduce:
rtn.append(reduce(lambda a, b: a*b, [r[:, :, i] for i in range(r.shape[2])], ones))
rtn.append(reduce(lambda a, b: ((a*b).T/(torch.max(np.abs(a*b), dim=1)[0])).T, [r[:, :, i] for i in range(r.shape[2])], ones))
return rtn
def set_seed(seed):
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(seed)
def generate_powerset(s):
power_set = []
# Generate all subsets
for subset_size in range(len(s)+1):
for subset in combinations(s, subset_size):
power_set.append(list(subset))
return power_set
set_seed(0)
inSize, outSize, resSize, alpha, sparsity = 1, 1, res_size, 0.7, 0.8
data, Y = fetchData(train_cycles + test_cycles + 100)
data_train, Y_train = data[:train_cycles+50], Y[:train_cycles+50]
data_test, Y_test = data[train_cycles+50:], Y[train_cycles+50:]
# Initialize reservoir networks
Wins, Ws = [], []
num_reservoirs = 500 # hard coded, for now.
for _ in range(num_reservoirs):
Win, W = ESN_init(inSize, outSize, resSize, alpha, sparsity)
Wins.append(Win)
Ws.append(W)
numshots = 10
dms = []
for _ in range(numshots):
dm = res(data_train, Ws, Wins, resSize, device)
# Simple noise model.
dm += torch.normal(mean=0, size=dm.shape, std=noise)
rtn = compute_powerset(dm)
dm = torch.stack(rtn, dim=2)
dms.append(dm)
dms = torch.stack(dms)
test = res(data_test, Ws, Wins, resSize, device)
rtn = compute_powerset(test)
test = torch.stack(rtn, dim=2)
Y_train = torch.tensor(Y_train).float()
Y_train = Y_train.to(device)
Wout = compute_wout()
# With the power set, it's too big for the GPU.
Wout = Wout.to(torch.device('cpu'))
Yhat = torch.einsum('ijk,ik->ij',test ,Wout).to(device)
expand_test = torch.tensor(Y_test[50:])[:, None].expand(-1, dm.shape[0]).T.to(device)
NRMSEs_lstsq = torch.sqrt(torch.mean((expand_test-Yhat)**2, axis=1)/torch.var(expand_test, axis=1))
rtn = []
# remove outliers
for n in NRMSEs_lstsq:
if n <=2:
rtn.append(n.cpu())
rtn = np.mean(rtn)
return res_size, rtn
For this simulation, we have added Gaussian noise to each output from the reservoir. This is a simple noise model, and as we will see in the next notebook, it makes reasoning about the impact of the noise on the performance of the reservoir simple enough for a blog post. And finally, we execute the code and analyze the results in the next notebook. But first, a note about choosing the number of reservoirs in each simulation.
To reason about the number of reservoirs we should include in each simulation, we note that from Chebyshev's Inequality [2] we know that for a random variable with variance $\sigma_0^2$,
$\Pr(|X-\mu |\geq k\sigma_0 )\leq {\frac {1}{k^{2}}}$.
For an ensemble average over $N$ reservoirs, if the ensemble has variance $\sigma_0^2$ then the random variable $X_N = \frac{1}{N} \sum_{i=1}^N X_i$ has a variance $\sigma^2 \approx \sigma_0^2/N$ and mean $\mu$ by the central limit theorem, so that
$\Pr(|X_N-\mu|\geq k\sigma_0 ) \leq {\frac {1}{Nk^{2}}}$.
We don't know a priori what the distribution of $X$ is, but we can reason about it a bit. If it were Gaussian, we know from the standard result that $\Pr(|X-\mu|\geq \sigma_0) \lesssim 68\%$.
We also know empirically that $\sigma_0 \lesssim 1$ (since the NRMSE itself is less than 1), so that $k \sim .5$ and $N\sim 500$, a bound of $\sim 1\%$. This is a very coarse bound on the deviation from the mean, but it is with very high probability, and we will see that in practice it works much better than these numbers suggest.
And finally we include an appendix on speeding up the simulation for larger networks where more training data is necessary, and hence longer reservoir simulation times.
Appendix: Alternate Timewise Parallelization Scheme¶
Alternatively we can take advantage of GPU parallelism by parallellizing the reservoir simulation in the time dimension. This isn't obvious, so let's check it out. We can validate on the test set (or train on the training set!) $n-$ times as fast if we break it into $n$ pieces. From the last notebook, we can plot our performance on NARMA10:
from reservoir import plot_NARMA
Wout, Echo, Y_test, data_test = plot_NARMA()
0.5887707279682656
Speeding up simulation¶
Now, we'll plot the testing data for the same trained reservoir, but split into two batches.
import matplotlib.pyplot as plt
from reservoir import reservoir_slow, fetchData
import numpy as np
np.random.seed(137)
plt.figure(figsize=(6, 12))
index = len(data_test) // 2
RA_Test1 = reservoir_slow(data_test[:index], *Echo, True)
RA_Test2 = reservoir_slow(data_test[index:], *Echo, True)
# Plot for first half of data
plt.subplot(2, 1, 1)
plt.plot(Y_test[:index], color='red', linewidth=5, label='Target Value')
plt.plot(np.dot(RA_Test1, Wout), color='green', linestyle=":", linewidth=2,
label='Test Prediction - First Half')
plt.legend()
plt.ylabel("NARMA(t)")
plt.xlabel("Test Sample (t)")
plt.title("Test Performance")
plt.yscale('log')
plt.ylim(.2,1)
# Plot for second half of data
plt.subplot(2, 1, 2)
plt.plot(Y_test[index:], color='red', linewidth=5, label='Target Value')
plt.plot(np.dot(RA_Test2, Wout), color='green', linestyle=":", linewidth=2,
label='Test Prediction - Second Half')
plt.legend()
plt.ylabel("NARMA(t)")
plt.xlabel("Test Sample (t)")
plt.title("Test Performance")
plt.yscale('log')
plt.ylim(.2,1)
plt.show()
The green spikes in the beginning need to be thrown away, but now these are embarassingly parallel matrix multiplications. Let's see if we can get a speed up by taking advantage of this.
import torch
from functools import reduce
from itertools import combinations
def reservoir(data, Win, Wres, inSize, resSize, alpha, batch_number, power_set):
# Apple "metal performance shaders".
device = torch.device("mps")
# Convert Wres, Win, and data to tensor and assign device
Wres = torch.tensor(Wres, device=device, dtype=torch.float32)
Win = torch.tensor(Win, device=device, dtype=torch.float32)
data = torch.tensor(data, device=device, dtype=torch.float32)
batch_size = len(data) // batch_number
# We'll trim some of the data for simplicity, we can fix this later.
data = data[:batch_number*batch_size]
new_data = data.view(batch_number, batch_size)
# Replicate Wres, Win for batch_number times
Wres_copies = Wres.repeat(batch_number, 1, 1)
Win_copies = Win.repeat(batch_number, 1, 1)
R_copies = 0.1 * (torch.ones((batch_number, resSize), device=device) - 0.5)
dm = torch.zeros((batch_number, batch_size - 50, 1 + inSize + resSize), device=device)
for t in range(batch_size):
u = new_data[:, t, None]
ones = torch.hstack((torch.ones(u.shape[0], 1, device=device), u))
R_copies = (1 - alpha)*R_copies +\
alpha*torch.tanh(torch.einsum('ij,ijk->ik', ones, Win_copies) +\
torch.einsum('ij,ijk->ik', R_copies, Wres_copies))
if t >= 50:
dm[:, t-50, :] = torch.cat((ones, R_copies), dim=1)
dm = dm.view(-1, 1+inSize+resSize)
new_data = new_data[:, 50:].flatten()
s = list(dm.T)[2:]
chosen_subsets = []
if power_set:
# Generate all subsets
ones = torch.ones(new_data.shape[0], dtype=torch.float32)
for subset_size in range(len(s)+1):
for subset in combinations(s, subset_size):
chosen_subsets.append([l.cpu().numpy() for l in subset])
power_signals = [reduce(lambda a, b: a*b, el, ones.numpy()) for el in chosen_subsets]
power_signals = [torch.tensor(pw, dtype=torch.float32)
for i, pw in enumerate(power_signals)]
power_signals = [el.cpu() for el in list(dm.T)] + power_signals
return torch.vstack(power_signals).T, new_data
Seeing the speed up¶
Now we are going to see if we get a speed up with this new code. In the next notebook we are going to simulate many different reservoirs to see their empirical performance as a function of size, where we also vary the amount of noise we inject into the reservoir. For this notebook, we will simply vary the amount of training data we consider to demonstrate a speedup. For small networks, this amount of training data is unnecessary, and harmful due to overfitting - we are just benchmarking the code!
from reservoir import ESN_init, fetchData, reservoir_slow
import numpy as np
powerset = True
inSize = outSize = 1
resSize = 5
alpha = .7
sparsity = .9
train_cycles = 1000000
Echo = ESN_init(inSize, outSize, resSize, alpha, sparsity)
data_train, Y_train = fetchData(train_cycles)
In the next cell, we vary over the number of batches. Parallelizing the computation with more batches is good, but the more batches we use, the more data we have to throw away.
from time import time
number_of_batches = [20000, 10000, 2000, 1000]
runtimes = []
for batch_number in number_of_batches:
start = time()
reservoir(data_train, *Echo, batch_number, powerset)
stop = time()
runtimes.append(stop-start)
print(runtimes)
[0.7789459228515625, 0.9311521053314209, 1.4612369537353516, 2.1751229763031006]
Because we need to throw away the initial samples while the reservoir "warms-up", we lose data each run (50 per batch). We'll benchmark the slow reservoir for each of these reduced training data lengths.
from time import time
number_of_batches = [20000, 10000, 2000, 1000]
runtimesserial = []
for batch_number in number_of_batches:
start = time()
reservoir_slow(data_train[:-50*batch_number], *Echo, powerset)
stop = time()
runtimesserial.append(stop-start)
print(runtimesserial)
[0.00034689903259277344, 9.030734062194824, 16.001220226287842, 16.72932481765747]
speedups = np.array(runtimesserial)/np.array(runtimes)
plt.plot(number_of_batches, speedups)
plt.title("Multiplicative Speedup of Parallel Implementation")
plt.xlabel("Number of batches")
plt.ylabel("Serial time / GPU time");
print(f"The maximum speed up is {np.max(speedups)}!")
The maximum speed up is 10.950462336299404!
Even with this simple optimization, just including a batch index, we see that we decrease training time. We would expect that $50*$numbatches is significantly smaller than the amount of training data, the cost of throwing away data is neglible, and we are turning a serial task into a parallel task. We expect to get a speedup from this as long as the size of the parallelized computations we make is no larger than the GPU size. A better benchmark might be to fix the goal (which is the amount of non-thrown away training data).
from time import time
from reservoir import ESN_init, fetchData, reservoir_slow
import numpy as np
train_cycles = 100000
number_of_batches = [500, 1000, 1500, 2000, 2500, 3000, 3500, 4000, 4500, 5000]
runtimes = []
for batch_number in number_of_batches:
Echo = ESN_init(inSize, outSize, resSize, alpha, sparsity)
data_train, Y_train = fetchData(batch_number * 50 + train_cycles)
start = time()
reservoir(data_train, *Echo, batch_number, powerset)
stop = time()
runtimes.append(stop-start)
print(runtimes)
[0.38776707649230957, 0.2525200843811035, 0.24639606475830078, 0.17734909057617188, 0.1612389087677002, 0.15805602073669434, 0.1435999870300293, 0.1475820541381836, 0.1337909698486328, 0.14001893997192383]
plt.plot(number_of_batches, runtimes)
plt.xlabel("Number of batches")
plt.ylabel("Total run time (s)")
plt.title("Run time versus number of batches for GPU implementation");
We can compare this to the serial implementation time.
from time import time
Echo = ESN_init(inSize, outSize, resSize, alpha, sparsity)
data_train, Y_train = fetchData(train_cycles)
start = time()
reservoir_slow(data_train, *Echo, powerset)
stop = time()
runtimesserial = stop - start
print(runtimesserial)
1.8172321319580078
And so we can plot the speedup again, this time we remember we fixed the desired training data post-throwing away the warmup data. It's similar --- no surprises here.
plt.plot(number_of_batches, runtimesserial/np.array(runtimes))
plt.title("Multiplicative Speedup of Parallel Implementation")
plt.xlabel("Number of batches")
plt.ylabel("Serial time / GPU time");
In addition to the overhead from throwing away data, there is a maximum imposed by the size of the M1 chip. It seems as if we have hit one of these, but we still outperform the serial implementation for all batch sizes. Now we're ready to use this simulator in the next notebook to see if we can observe any empiral scaling behavior for these networks in the presence of noise!