Curse of Dimensionality II: Solution to the Problem
Thie example class is intended to be open, as a problem you would need to solve as a researcher. This is the opportunity to use everything you have learned in C1 and put it into practice.
The numerical dataset you will use is here.
You should download it and put it in the same folder as this notebook.
Your goal is to build a neural network interpolator:
where \(x_i\)’s are the features and \(y\) is a label. All are scalars.
[1]:
import pickle
import numpy as np
import pandas as pd
# 1. Load the pickled object
with open("interpolation_dataset.pkl", "rb") as f:
data = pickle.load(f)
print("Type of loaded object:", type(data))
Type of loaded object: <class 'dict'>
[5]:
X = np.asarray(data["X"])
y = np.asarray(data["y"])
[6]:
print("X shape:", X.shape)
print("y shape:", y.shape)
X shape: (5000, 5)
y shape: (5000,)
Here are your tasks. You don’t need CSD3 for this. Use Google Colab if your laptop does not allow you to launch jupyter notebooks (it should!):
Build a simple fully-connected neural network (FCNN / MLP) in PyTorch
Model of the form \(f(x_1, \ldots, x_5) = y\)
For example: 3 hidden layers with 64 neurons each, non-linear activations (e.g. ReLU)
For example:
class MLP(nn.Module):
def __init__(self, in_dim=5, hidden=64, depth=3, out_dim=1, act=nn.SiLU):
super().__init__()
layers = [nn.Linear(in_dim, hidden), act()]
for _ in range(depth-1):
layers += [nn.Linear(hidden, hidden), act()]
layers += [nn.Linear(hidden, out_dim)]
self.net = nn.Sequential(*layers)
def forward(self, x): return self.net(x)
Train the network
Choose a suitable loss function (e.g. MSELoss for regression)
Choose an optimizer (e.g. Adam) and tune learning rate / number of epochs (for example
Monitor the training loss as a function of epoch
For example (this is a boilerplate and depends on routines not given here):
def train_model(X, y, batch_size=256, max_epochs=200, lr=5e-3, weight_decay=1e-6, patience=20):
stats = compute_norm_stats(X, y)
Xn = apply_x_norm(X, stats).astype(np.float32)
yn = apply_y_norm(y, stats).astype(np.float32)
Xn_t = torch.from_numpy(Xn)
yn_t = torch.from_numpy(yn).unsqueeze(1)
ds = TensorDataset(Xn_t, yn_t)
n_total = len(ds)
n_val = max(200, int(0.15*n_total))
n_test = max(200, int(0.15*n_total))
n_train = n_total - n_val - n_test
train_ds, val_ds, test_ds = random_split(ds, [n_train, n_val, n_test],
generator=torch.Generator().manual_seed(123))
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, drop_last=False)
val_loader = DataLoader(val_ds, batch_size=1024, shuffle=False)
test_loader = DataLoader(test_ds, batch_size=1024, shuffle=False)
model = MLP().to(device)
opt = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)
sched = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=max_epochs)
loss_fn = nn.MSELoss()
best_val = float("inf"); best_state = None; no_improve = 0
for epoch in range(1, max_epochs+1):
model.train()
train_loss = 0.0
for xb, yb in train_loader:
xb, yb = xb.to(device), yb.to(device)
opt.zero_grad()
pred = model(xb)
loss = loss_fn(pred, yb)
loss.backward()
opt.step()
train_loss += loss.item() * xb.size(0)
sched.step()
model.eval()
with torch.no_grad():
val_loss = 0.0
for xb, yb in val_loader:
xb, yb = xb.to(device), yb.to(device)
pred = model(xb)
val_loss += loss_fn(pred, yb).item() * xb.size(0)
train_loss /= len(train_ds); val_loss /= len(val_ds)
if val_loss < best_val - 1e-6:
best_val = val_loss
best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()}
no_improve = 0
else:
no_improve += 1
if epoch % 20 == 0 or no_improve == 1:
print(f"Epoch {epoch:4d} | train MSE {train_loss:.5f} | val MSE {val_loss:.5f} | lr {sched.get_last_lr()[0]:.3e}")
if no_improve >= patience:
print(f"Early stopping at epoch {epoch}. Best val MSE: {best_val:.5f}")
break
# Load best weights
if best_state is not None:
model.load_state_dict(best_state)
# Evaluate on test
model.eval()
mse, n = 0.0, 0
with torch.no_grad():
for xb, yb in test_loader:
xb, yb = xb.to(device), yb.to(device)
pred = model(xb)
mse += nn.functional.mse_loss(pred, yb, reduction="sum").item()
n += xb.size(0)
test_mse = mse / n
test_rmse = math.sqrt(test_mse)
print(f"Test RMSE (normalized target units): {test_rmse:.5f}")
return model.cpu(), stats, (best_val, test_mse)
Test the network
Split the dataset into training and test sets
Evaluate the trained model on the test set only once training is complete
Evaluate performance / accuracy
Compute quantitative metrics (e.g. MSE, MAE, or \(R^2\)) on the test set
Produce diagnostic plots (e.g. predicted vs. true \(y\)), residuals vs. \(y\), etc.
Experiment with dataset size
Write your own helper function to generate more or less data from a cutom generating function.
Retrain the network with different dataset sizes and compare performance
Investigate timing and memory usage
Measure training and inference time (e.g. per epoch, per batch, per sample)
Inspect approximate model size / memory footprint (number of parameters, dtype, etc.)
Discuss how model architecture and dataset size affect computational cost
The emphasis here, should be on the research computing aspects (time, memory, complexity) rather than machine learning (which are covered in your other classes).