Curse of Dimensionality II: Solution to the Problem

Thie example class is intended to be open, as a problem you would need to solve as a researcher. This is the opportunity to use everything you have learned in C1 and put it into practice.

The numerical dataset you will use is here.

You should download it and put it in the same folder as this notebook.

Your goal is to build a neural network interpolator:

\[f(x_1,..,x_5)=y\]

where \(x_i\)’s are the features and \(y\) is a label. All are scalars.

[1]:
import pickle
import numpy as np
import pandas as pd

# 1. Load the pickled object
with open("interpolation_dataset.pkl", "rb") as f:
    data = pickle.load(f)

print("Type of loaded object:", type(data))
Type of loaded object: <class 'dict'>
[5]:
X = np.asarray(data["X"])
y = np.asarray(data["y"])
[6]:
print("X shape:", X.shape)
print("y shape:", y.shape)
X shape: (5000, 5)
y shape: (5000,)

Here are your tasks. You don’t need CSD3 for this. Use Google Colab if your laptop does not allow you to launch jupyter notebooks (it should!):

  • Build a simple fully-connected neural network (FCNN / MLP) in PyTorch

    • Model of the form \(f(x_1, \ldots, x_5) = y\)

    • For example: 3 hidden layers with 64 neurons each, non-linear activations (e.g. ReLU)

For example:

class MLP(nn.Module):
    def __init__(self, in_dim=5, hidden=64, depth=3, out_dim=1, act=nn.SiLU):
        super().__init__()
        layers = [nn.Linear(in_dim, hidden), act()]
        for _ in range(depth-1):
            layers += [nn.Linear(hidden, hidden), act()]
        layers += [nn.Linear(hidden, out_dim)]
        self.net = nn.Sequential(*layers)
    def forward(self, x): return self.net(x)
  • Train the network

    • Choose a suitable loss function (e.g. MSELoss for regression)

    • Choose an optimizer (e.g. Adam) and tune learning rate / number of epochs (for example

    • Monitor the training loss as a function of epoch

For example (this is a boilerplate and depends on routines not given here):

def train_model(X, y, batch_size=256, max_epochs=200, lr=5e-3, weight_decay=1e-6, patience=20):
    stats = compute_norm_stats(X, y)
    Xn = apply_x_norm(X, stats).astype(np.float32)
    yn = apply_y_norm(y, stats).astype(np.float32)

    Xn_t = torch.from_numpy(Xn)
    yn_t = torch.from_numpy(yn).unsqueeze(1)

    ds = TensorDataset(Xn_t, yn_t)

    n_total = len(ds)
    n_val = max(200, int(0.15*n_total))
    n_test = max(200, int(0.15*n_total))
    n_train = n_total - n_val - n_test
    train_ds, val_ds, test_ds = random_split(ds, [n_train, n_val, n_test],
                                             generator=torch.Generator().manual_seed(123))

    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, drop_last=False)
    val_loader   = DataLoader(val_ds, batch_size=1024, shuffle=False)
    test_loader  = DataLoader(test_ds, batch_size=1024, shuffle=False)

    model = MLP().to(device)
    opt = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)
    sched = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=max_epochs)
    loss_fn = nn.MSELoss()

    best_val = float("inf"); best_state = None; no_improve = 0
    for epoch in range(1, max_epochs+1):
        model.train()
        train_loss = 0.0
        for xb, yb in train_loader:
            xb, yb = xb.to(device), yb.to(device)
            opt.zero_grad()
            pred = model(xb)
            loss = loss_fn(pred, yb)
            loss.backward()
            opt.step()
            train_loss += loss.item() * xb.size(0)
        sched.step()

        model.eval()
        with torch.no_grad():
            val_loss = 0.0
            for xb, yb in val_loader:
                xb, yb = xb.to(device), yb.to(device)
                pred = model(xb)
                val_loss += loss_fn(pred, yb).item() * xb.size(0)
        train_loss /= len(train_ds); val_loss /= len(val_ds)

        if val_loss < best_val - 1e-6:
            best_val = val_loss
            best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()}
            no_improve = 0
        else:
            no_improve += 1

        if epoch % 20 == 0 or no_improve == 1:
            print(f"Epoch {epoch:4d} | train MSE {train_loss:.5f} | val MSE {val_loss:.5f} | lr {sched.get_last_lr()[0]:.3e}")

        if no_improve >= patience:
            print(f"Early stopping at epoch {epoch}. Best val MSE: {best_val:.5f}")
            break

    # Load best weights
    if best_state is not None:
        model.load_state_dict(best_state)

    # Evaluate on test
    model.eval()
    mse, n = 0.0, 0
    with torch.no_grad():
        for xb, yb in test_loader:
            xb, yb = xb.to(device), yb.to(device)
            pred = model(xb)
            mse += nn.functional.mse_loss(pred, yb, reduction="sum").item()
            n += xb.size(0)
    test_mse = mse / n
    test_rmse = math.sqrt(test_mse)
    print(f"Test RMSE (normalized target units): {test_rmse:.5f}")

    return model.cpu(), stats, (best_val, test_mse)
  • Test the network

    • Split the dataset into training and test sets

    • Evaluate the trained model on the test set only once training is complete

  • Evaluate performance / accuracy

    • Compute quantitative metrics (e.g. MSE, MAE, or \(R^2\)) on the test set

    • Produce diagnostic plots (e.g. predicted vs. true \(y\)), residuals vs. \(y\), etc.

  • Experiment with dataset size

    • Write your own helper function to generate more or less data from a cutom generating function.

    • Retrain the network with different dataset sizes and compare performance

  • Investigate timing and memory usage

    • Measure training and inference time (e.g. per epoch, per batch, per sample)

    • Inspect approximate model size / memory footprint (number of parameters, dtype, etc.)

    • Discuss how model architecture and dataset size affect computational cost

The emphasis here, should be on the research computing aspects (time, memory, complexity) rather than machine learning (which are covered in your other classes).