Skip to content

Introduction

Developing distributed models using LMF is straightforward given knowledge of how to use the Horovod python library. All other infrastructure (e.g., MPI, data loading) is handled by the Lucd platform.

PyTorch

The current version of LMF using Horovod model training was only tested using PyTorch models. Additionally, the LMF full model approach must be used for Horovod modeling. Using other frameworks such as TensorFlow might work, but they are not officially supported at this time. Examples of how to use Horovod in PyTorch can be found at https://github.com/horovod/horovod/tree/master/examples/pytorch. A specific example of a model used for testing with LMF can be found in the The Lucd Model Shop.

For convenience, an example is included below.

import traceback
import horovod.torch as hvd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from eda.lib import uds
from eda.lib import ml as lml
from eda.lib import constants
from eda.internal import train
from eda import log

# Seed the torch backend
torch.manual_seed(1234)

# Define the PyTorch neural network!
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()

        # define layers
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=6, kernel_size=5)
        self.conv2 = nn.Conv2d(in_channels=6, out_channels=12, kernel_size=5)

        self.fc1 = nn.Linear(12 * 4 * 4, out_features=120)
        self.fc2 = nn.Linear(in_features=120, out_features=60)
        self.fc3 = nn.Linear(in_features=60, out_features=10)
        self.out = nn.Softmax(dim=1)

    def forward(self, t):
        # conv 1
        t = self.conv1(t)
        t = F.relu(t)
        t = F.max_pool2d(t, kernel_size=2, stride=2)

        # conv 2
        t = self.conv2(t)
        t = F.relu(t)
        t = F.max_pool2d(t, kernel_size=2, stride=2)

        # fc1
        t = t.reshape(-1, 12 * 4 * 4)
        t = self.fc1(t)
        t = F.relu(t)

        # fc2
        t = self.fc2(t)
        t = F.relu(t)

        # output
        t = self.fc3(t)
        t = self.out(t)

        return t


# Function used in classification models to map int labels to string names
def _label_mapping():
    fashion_ret_dict = {0: 'T-shirt_top', 1: 'Trouser', 2: 'Pullover', 3: 'Dress', 4: 'Coat',
                        5: 'Sandal', 6: 'Shirt', 7: 'Sneaker', 8: 'Bag', 9: 'Ankle boot'}
    return fashion_ret_dict


# Entrypoint of the Lucd Advanced modeling framework (your main function is what gets called!)
def main(args):
    hvd.init()
    # Get required training parameters
    lr = args["parameters"]['lr']
    vds = args['vds']['training']
    testing_dataset_percent = args['parameters']['test_percent']
    evaluation_dataset_percent = args['parameters']['eval_percent']
    log_dir = str(args['exportdir'])
    tid = args['train_id']
    model_id = args['model']
    graph_version = args['graphversion']
    training_steps = args['parameters']['steps']

    # Delete previous plots from this training object if they exist
    train.delete_plots({'uid': tid})
    train.delete_confusion({'uid': tid})

    # Set class names for target variable
    class_names = ['T-shirt_top', 'Trouser', 'Pullover', 'Dress', 'Coat', 'Sandal', 'Shirt', 'Sneaker', 'Bag',
                   'Ankle boot']

    # Retrieve dataset, instantiate PyTorch DataLoaders, and train!
    try:
        train_df, eval_df, test_df, _ = uds.train_eval_test_split_pytorch(vds,
                                                                          evaluation_dataset_percent,
                                                                          testing_dataset_percent,
                                                                          'classification')

        # Create iterator objects for train and valid datasets
        trainloader = DataLoader(train_df, batch_size=10)
        validloader = DataLoader(eval_df, batch_size=10)

        # choose optimizer and loss function
        net = Net()
        criterion = nn.NLLLoss()

        optimizer = hvd.DistributedOptimizer(torch.optim.SGD(net.parameters(), lr=lr),
                                             named_parameters=net.named_parameters())
        hvd.broadcast_parameters(net.state_dict(), root_rank=0)
        hvd.broadcast_optimizer_state(optimizer, root_rank=0)

        # optimizer = torch.optim.SGD(net.parameters(), lr=lr)

        # PyTorch training loop (with variable initialization)
        validate_frequency, accuracy, final_loss = 10, 0.0, 0.0
        train_dict, eval_dict = {}, {}
        predictions = []
        for epoch in range(training_steps):
            # Lists for collecting generated information
            train_loss, valid_loss, predictions = [], [], []
            tl, total_items, step = 0, 0, 0

            # Train the model w/ forward and backwards pass
            for data in trainloader:
                net.train()
                optimizer.zero_grad()
                out = torch.log(net(data[constants.PYTORCH_FEATURES]) + 1e-20)
                loss = criterion(out, data[constants.PYTORCH_LABEL])
                loss.backward()
                optimizer.step()
                train_loss.append(loss.item())
                tl += loss.item()
                total_items += len(data[constants.PYTORCH_FEATURES])
                step += 1

            # Compute the loss values for recording and add to training plots dictionary
            epoch_avg_loss = tl / total_items
            # Compute the per-epoch average loss and set up the plots dictionary
            print("epoch, avg loss: (" + str(epoch) + ", " + str(epoch_avg_loss) + ")")

            final_loss = epoch_avg_loss
            if "average_loss" not in train_dict:
                train_dict["average_loss"] = {
                    "average_loss": [],
                    "labels": ["Epoch", "Average Loss"],
                    "description": "Average loss throughout training epochs."
                }

            # Store the average loss information every "validate_frequency" epochs
            if epoch % validate_frequency == 0:
                train_dict["average_loss"]["average_loss"].append([epoch, epoch_avg_loss])

                if hvd.rank() == 0:
                    lml.update_plots(True, tid, train_dict)

            # Evaluation loop
            output_list = []
            truth_list = []
            if epoch % validate_frequency == 0 or (epoch == training_steps - 1):
                correct = 0
                total = 0
                with torch.no_grad():
                    for data in validloader:
                        outputs = torch.log(net(data[constants.PYTORCH_FEATURES]) + 1e-20)
                        _, predicted = torch.max(outputs.data, 1)
                        predictions.extend(predicted)
                        total += data[constants.PYTORCH_LABEL].size(0)
                        correct += (predicted == data[constants.PYTORCH_LABEL]).sum().item()

                        for x in outputs.numpy():
                            output_list.append(x)

                        for y in data[constants.PYTORCH_LABEL].numpy():
                            truth_list.append(y)

                # Compute accuracy and store in the database for plotting in the GUI!
                accuracy = 100 * correct / total
                if "accuracy" not in eval_dict:
                    eval_dict["accuracy"] = {
                        "accuracy": [],
                        "labels": ["Epoch", "Accuracy"],
                        "description": "Validation Accuracy"
                    }
                eval_dict["accuracy"]["accuracy"].append([epoch, accuracy])

                if hvd.rank() == 0:

                    # Store plots to backend
                    lml.update_plots(False, tid, eval_dict)

                    # Generate ROC & PR plots on the last epoch
                    if epoch == training_steps - 1:
                        roc_auc, curve_dict, thresh = lml.lucd_roc_curve(truth_list,
                                                                         output_list,
                                                                         ['T-shirt_top', 'Trouser', 'Pullover', 'Dress',
                                                                          'Coat', 'Sandal', 'Shirt', 'Sneaker', 'Bag',
                                                                          'Ankle boot'],
                                                                         tid, True)

                        precision, recall, response_dict, thresholds = lml.lucd_precision_recall_curve(truth_list,
                                                                                                       output_list,
                                                                                                       ['T-shirt_top',
                                                                                                        'Trouser',
                                                                                                        'Pullover', 'Dress',
                                                                                                        'Coat', 'Sandal',
                                                                                                        'Shirt',
                                                                                                        'Sneaker', 'Bag',
                                                                                                        'Ankle boot'],
                                                                                                       tid, True)

        if hvd.rank() == 0:
            # Generate a confusion matrix for the GUI
            lml.lucd_confusion_matrix(validloader, predictions, 10, _label_mapping(), tid, True)

            # Store model graph and performance stats back to Lucd back-end
            model_filename = uds.zip_model_pt(net, model_id, log_dir + '/fmnist_model.pt', graph_version)
            with open(model_filename, "rb") as graph_file:
                # log.debug("storing graph")
                train.update({tid: {
                    'performance': {
                        'loss': final_loss,
                        'accuracy': accuracy
                    },
                    'graph_version': graph_version,
                    'ordered_class_names': class_names,
                    'graph_file': graph_file.read()
                }})

        hvd.join()
    except Exception as err:
        traceback.print_exc()
        error = traceback.format_exc()
        log.error("Error training the model: {0} {1} \n\n".format(err, error))
        raise