Skip to content
Lucd Modeling Framework | Code Examples | 6.2.7

Important Notes

When implementing the model script, except blocks (for handling exceptions) MUST end with raise, as opposed to another terminating statement like return. This ensures that the status of the model is accurately captured in the Lucd GUI.

TensorFlow

Simple Classification

from eda.int import model
from eda import log
from eda.lib import lucd_uds as uds
from eda.lib import lucd_ml as lml
from eda.int import train

import tensorflow as tf
import numpy as np
from sklearn.metrics import precision_recall_fscore_support


def process_data(features, labels):
    # Can be passed to lucd_uds.train_eval_test_split_tensorflow for feature transformation, but will be
    # deprecated soon

    return features, labels


def _label_mapping():
    return {2: 'I. setosa', 1: 'I. virginica', 0: 'I. versicolor'}


def model(num_features, training_steps, learning_rate, log_dir, training_data, evaluation_data):
    feature_dict = {"x": tf.float32}
    num_classes = 3
    target_type = tf.int32

    # Define the feature columns for inputs.
    feature_columns = [
        tf.feature_column.numeric_column("x", shape=num_features)
    ]

    serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
        tf.feature_column.make_parse_example_spec(feature_columns))

    # Create the Estimator
    training_config = tf.estimator.RunConfig(
        save_summary_steps=10,
        save_checkpoints_steps=10)

    classifier = tf.estimator.DNNClassifier(
        config=training_config,
        feature_columns=feature_columns,
        hidden_units=[10, 20, 10],
        n_classes=num_classes,
        model_dir=log_dir
    )

    train_spec = tf.estimator.TrainSpec(
        input_fn=lambda: uds.get_tf_dataset(feature_dict, training_data, num_features,
                                            target_type).repeat(count=None).shuffle(100).batch(100),
        max_steps=training_steps)

    latest_exporter = tf.estimator.LatestExporter(
        name="models",
        serving_input_receiver_fn=serving_input_receiver_fn,
        exports_to_keep=10)

    eval_spec = tf.estimator.EvalSpec(
        input_fn=lambda: uds.get_tf_dataset(feature_dict, evaluation_data, num_features,
                                            target_type).repeat(count=None).shuffle(100).batch(100),
        exporters=latest_exporter
    )

    return classifier, train_spec, eval_spec, ["x"], feature_dict, target_type, serving_input_receiver_fn


def main(args):
    # Get required training parameters
    tid = args['train_id']
    model_id = args['model']
    virtual_dataset_id = args['vds']

    learning_rate = args['parameters']['lr']
    evaluation_dataset_percent = args['parameters']['eval_percent']
    testing_dataset_percent = args['parameters']['test_percent']
    training_steps = args['parameters']['steps']

    if testing_dataset_percent <= 0 or evaluation_dataset_percent <= 0 or (
            testing_dataset_percent + evaluation_dataset_percent >= 1.0):
        train.status(tid, 5, "testing_data_percent and/or evaluation_data_percent was set correctly set")
        log.debug(f"Model aborted: test data percent: {testing_dataset_percent}")
        return

    # Get required params for model management and logging
    log_dir = str(args['exportdir'])
    graph_version = args['graphversion']

    # Classification_mode should be "tf_premade_multiclass"
    classification_mode = args['parameters']['classification_mode']

    try:
        train.status(tid, 0)

        # Prepare vds data for modeling
        delayed_values_training, delayed_values_evaluation, delayed_values_testing, my_df_testing_label, num_features = \
            uds.train_eval_test_split_tensorflow(virtual_dataset_id, evaluation_dataset_percent,
                                                 testing_dataset_percent, process_data)

        _estimator, train_spec, eval_spec, feature_labels, feature_dict, target_type, serving_input_receiver_fn = \
            model(num_features, training_steps, learning_rate, log_dir, delayed_values_training,
                  delayed_values_evaluation)
    except Exception as exception:
        train.status(tid, 5, exception)
        log.debug(f"Caught exception: {exception}")
        raise

    try:
        train.status(tid, 1)

        _tuple = tf.estimator.train_and_evaluate(_estimator, train_spec=train_spec, eval_spec=eval_spec)
        performance_tuple = _tuple[0]

        perf_list = []
        for k, v in performance_tuple.items():
            perf_list.append(k + ':' + str(v))

        train.status(tid, 2)

        # Compute confusion matrix
        predictions = lml.get_predictions_classification(_estimator,
                                                         lambda: uds.get_tf_dataset(feature_dict,
                                                                                    delayed_values_testing,
                                                                                    num_features,
                                                                                    target_type).batch(1),
                                                         classification_mode, .5)

        actual = (my_df_testing_label.compute()).iloc[:, 0].tolist()

        label_mapping = _label_mapping()

        # Compute confusion matrix
        cm_string = lml.confusion_matrix(predictions, actual, label_mapping)

        # Compute average precision, recall, and f1 score for multi-class (not multi-label)
        actual_np = np.array(actual)
        predictions_np = np.array(predictions)
        precision_macro, recall_macro, f1_macro, _ = precision_recall_fscore_support(actual_np, predictions_np,
                                                                                     average='macro')
        precision_micro, recall_micro, f1_micro, _ = precision_recall_fscore_support(actual_np, predictions_np,
                                                                                     average='micro')
        precision_weighted, recall_weighted, f1_weighted, _ = precision_recall_fscore_support(actual_np, predictions_np,
                                                                                              average='weighted')

        # Compute precision, recall, and f1 score per label
        labels = list(set(actual))
        results = precision_recall_fscore_support(actual_np, predictions_np, average=None, labels=labels)
        results_string = ''
        for i in range(0, len(labels)):
            stat_list = []
            for stat in results[i]:
                stat_list.append(str(stat))
            stat_string = ','.join(stat_list)
            raw_label = labels[i]
            results_string += (label_mapping[raw_label] + '-')
            results_string += (stat_string + ';')

        log.info("zipping model up...")
        model_filename = uds.zip_tf_model(_estimator, serving_input_receiver_fn, model_id, graph_version, log_dir)
        log.debug("zipped model " + str(model_filename))

        # Store model graph and performance stats back to Lucd back-end
        with open(model_filename, "rb") as graph_file:
            accuracy = float(performance_tuple['accuracy'])
            final_loss = float(performance_tuple['loss'])
            import json
            train.update({tid: {
                'performance': {
                    'loss': final_loss,
                    'accuracy': accuracy,
                    'confusion_matrix': cm_string,
                    'macro_precision': precision_macro,
                    'macro_recall': recall_macro,
                    'macro_f1': f1_macro,
                    'micro_precision': precision_micro,
                    'micro_recall': recall_micro,
                    'micro_f1': f1_micro,
                    'weighted_precision': precision_weighted,
                    'weighted_recall': recall_weighted,
                    'weighted_f1': f1_weighted,
                    'precision_recall_f1_per_label': results_string,
                },
                'graph_version': graph_version,
                'graph_file': graph_file.read()
            }})

            log.debug("storing graph")
            log.debug("model id: " + str(model_id))

        train.status(tid, 3)

        # Store checkpoint files
        log.info("attempting checkpoint")
        uds.store_checkpoint(model_id, graph_version, virtual_dataset_id, log_dir)

        train.status(tid, 4)
    except Exception as exception:
        log.info(f"Caught exception: {exception}")
        train.status(tid, 5, "failure: error in training -- " + str(exception))
        raise

Classification with Multiple Input Feature Types

from eda.int import model
from eda import log
from eda.lib import lucd_uds as uds
from eda.lib import lucd_ml as lml
from eda.int import train

import tensorflow as tf
import numpy as np
from sklearn.metrics import precision_recall_fscore_support


def process_data(features, labels):
    # Can be passed to lucd_uds.train_eval_test_split_tensorflow for feature transformation, but will be
    # deprecated soon

    features['flower_mean'] = features['flower_mean'].replace([False, True], ["False", "True"])

    return features, labels


def _label_mapping():
    return {2: 'I. setosa', 1: 'I. virginica', 0: 'I. versicolor'}


def model(num_features, training_steps, learning_rate, log_dir, training_data, evaluation_data):
    feature_dict = {"flower.petal_length": tf.float64,
                    "flower.petal_width": tf.float64,
                    "flower.sepal_length": tf.float64,
                    "flower.sepal_width": tf.float64,
                    "flower_mean": tf.string
                    }

    target_type = tf.int32

    num_classes = 3

    # Define the feature columns for inputs.
    feature_columns = [
        tf.feature_column.numeric_column(key="flower.petal_length"),
        tf.feature_column.numeric_column(key="flower.petal_width"),
        tf.feature_column.numeric_column(key="flower.sepal_length"),
        tf.feature_column.numeric_column(key="flower.sepal_width"),
        tf.feature_column.indicator_column(
            tf.feature_column.categorical_column_with_vocabulary_list(key='flower_mean',
                                                                      vocabulary_list=['True', 'False'])
        )
    ]

    serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
        tf.feature_column.make_parse_example_spec(feature_columns))

    # Create the Estimator
    training_config = tf.estimator.RunConfig(
        save_summary_steps=10,
        save_checkpoints_steps=10)

    classifier = tf.estimator.DNNClassifier(
        config=training_config,
        feature_columns=feature_columns,
        hidden_units=[10, 20, 10],
        n_classes=num_classes,
        model_dir=log_dir
    )

    train_spec = tf.estimator.TrainSpec(
        input_fn=lambda: uds.get_tf_dataset(feature_dict, training_data, num_features,
                                            target_type).repeat(count=None).shuffle(100).batch(100),
        max_steps=training_steps)

    latest_exporter = tf.estimator.LatestExporter(
        name="models",
        serving_input_receiver_fn=serving_input_receiver_fn,
        exports_to_keep=10)

    eval_spec = tf.estimator.EvalSpec(
        input_fn=lambda: uds.get_tf_dataset(feature_dict, evaluation_data, num_features,
                                            target_type).repeat(count=None).shuffle(100).batch(100),
        exporters=latest_exporter)

    return classifier, train_spec, eval_spec, feature_dict, target_type, serving_input_receiver_fn


def main(args):
    # Get required training parameters
    tid = args['train_id']
    model_id = args['model']
    virtual_dataset_id = args['vds']

    learning_rate = args['parameters']['lr']
    evaluation_dataset_percent = args['parameters']['eval_percent']
    testing_dataset_percent = args['parameters']['test_percent']
    training_steps = args['parameters']['steps']

    if testing_dataset_percent <= 0 or evaluation_dataset_percent <= 0 or (
            testing_dataset_percent + evaluation_dataset_percent >= 1.0):
        train.status(tid, 5, "testing_data_percent and/or evaluation_data_percent was set correctly set")
        log.debug(f"Model aborted: test data percent: {testing_dataset_percent}")
        return

    # Get required params for model management and logging
    log_dir = str(args['exportdir'])
    graph_version = args['graphversion']

    # Classification_mode should be "tf_premade_multiclass"
    classification_mode = args['parameters']['classification_mode']

    try:
        train.status(tid, 0)

        # Prepare vds data for modeling
        delayed_values_training, delayed_values_evaluation, delayed_values_testing, my_df_testing_label, num_features = \
            uds.train_eval_test_split_tensorflow(virtual_dataset_id, evaluation_dataset_percent,
                                                 testing_dataset_percent, process_data)

        _estimator, train_spec, eval_spec, feature_dict, target_type, serving_input_receiver_fn = \
            model(num_features, training_steps, learning_rate, log_dir, delayed_values_training,
                  delayed_values_evaluation)
    except Exception as exception:
        train.status(tid, 5, exception)
        log.debug(f"Caught exception: {exception}")
        raise

    try:
        train.status(tid, 1)

        _tuple = tf.estimator.train_and_evaluate(_estimator, train_spec=train_spec, eval_spec=eval_spec)
        performance_tuple = _tuple[0]

        perf_list = []
        for k, v in performance_tuple.items():
            perf_list.append(k + ':' + str(v))

        train.status(tid, 2)

        # Compute confusion matrix
        predictions = lml.get_predictions_classification(_estimator,
                                                         lambda: uds.get_tf_dataset(feature_dict,
                                                                                    delayed_values_testing,
                                                                                    num_features,
                                                                                    target_type).batch(1),
                                                         classification_mode, .5)

        actual = (my_df_testing_label.compute()).iloc[:, 0].tolist()

        label_mapping = _label_mapping()

        # Compute confusion matrix
        cm_string = lml.confusion_matrix(predictions, actual, label_mapping)

        # Compute average precision, recall, and f1 score for multi-class (not multi-label)
        actual_np = np.array(actual)
        predictions_np = np.array(predictions)
        precision_macro, recall_macro, f1_macro, _ = precision_recall_fscore_support(actual_np, predictions_np,
                                                                                     average='macro')
        precision_micro, recall_micro, f1_micro, _ = precision_recall_fscore_support(actual_np, predictions_np,
                                                                                     average='micro')
        precision_weighted, recall_weighted, f1_weighted, _ = precision_recall_fscore_support(actual_np, predictions_np,
                                                                                              average='weighted')

        # Compute precision, recall, and f1 score per label
        labels = list(set(actual))
        results = precision_recall_fscore_support(actual_np, predictions_np, average=None, labels=labels)
        results_string = ''
        for i in range(0, len(labels)):
            stat_list = []
            for stat in results[i]:
                stat_list.append(str(stat))
            stat_string = ','.join(stat_list)
            raw_label = labels[i]
            results_string += (label_mapping[raw_label] + '-')
            results_string += (stat_string + ';')

        log.info("zipping model up...")
        model_filename = uds.zip_tf_model(_estimator, serving_input_receiver_fn, model_id, graph_version, log_dir)
        log.debug("zipped model " + str(model_filename))

        # Store model graph and performance stats back to Lucd back-end
        with open(model_filename, "rb") as graph_file:
            accuracy = float(performance_tuple['accuracy'])
            final_loss = float(performance_tuple['loss'])
            import json
            train.update({tid: {
                'performance': {
                    'loss': final_loss,
                    'accuracy': accuracy,
                    'confusion_matrix': cm_string,
                    'macro_precision': precision_macro,
                    'macro_recall': recall_macro,
                    'macro_f1': f1_macro,
                    'micro_precision': precision_micro,
                    'micro_recall': recall_micro,
                    'micro_f1': f1_micro,
                    'weighted_precision': precision_weighted,
                    'weighted_recall': recall_weighted,
                    'weighted_f1': f1_weighted,
                    'precision_recall_f1_per_label': results_string,
                },
                'graph_version': graph_version,
                'graph_file': graph_file.read()
            }})

            log.debug("storing graph")
            log.debug("model id: " + str(model_id))

        train.status(tid, 3)

        # Store checkpoint files
        log.info("attempting checkpoint")
        uds.store_checkpoint(model_id, graph_version, virtual_dataset_id, log_dir)

        train.status(tid, 4)
    except Exception as exception:
        log.info(f"Caught exception: {exception}")
        train.status(tid, 5, "failure: error in training -- " + str(exception))
        raise

Text Classification

from eda import log
from eda.lib import lucd_uds as uds
from eda.lib import lucd_ml as lml
from eda.int import asset
from eda.int import train

from tensorflow import keras
import tensorflow as tf

import numpy as np
from sklearn.metrics import precision_recall_fscore_support


feature_label = "embedding_input"


def process_data(features, labels):
    # Can be passed to lucd_uds.train_eval_test_split_tensorflow for feature transformation, but will be
    # deprecated soon

    labels = labels.replace(["negative", "positive"], [0, 1])

    return features, labels


def _label_mapping():
    return {0: 'negative', 1: 'positive'}


def model(training_data, evaluation_data, training_steps, log_dir, embedding_matrix, embedding_size, word_index_mapping,
          max_document_length, pad_value):
    target_type = tf.int32

    # Define the feature columns for inputs.
    feature_columns = [
        tf.feature_column.numeric_column(feature_label, shape=[max_document_length])
    ]

    serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
        tf.feature_column.make_parse_example_spec(feature_columns))

    keras_model = keras.Sequential()
    keras_model.add(keras.layers.Embedding(len(embedding_matrix), embedding_size,
                                           weights=[embedding_matrix], trainable=False))
    keras_model.add(keras.layers.GlobalAveragePooling1D())
    keras_model.add(keras.layers.Dense(100, activation=tf.nn.relu))
    keras_model.add(keras.layers.Dense(1, activation=tf.nn.sigmoid))

    keras_model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

    training_config = tf.estimator.RunConfig(
        save_summary_steps=10,
        save_checkpoints_steps=10)

    est_model = tf.keras.estimator.model_to_estimator(keras_model=keras_model, model_dir=log_dir,
                                                      config=training_config)

    train_spec = tf.estimator.TrainSpec(
        input_fn=lambda: uds.get_tf_dataset_text(feature_label, training_data, word_index_mapping, pad_value,
                                                 max_document_length,
                                                 target_type).repeat(count=None).shuffle(100).batch(int(100)),
        max_steps=training_steps)

    latest_exporter = tf.estimator.LatestExporter(
        name="models",
        serving_input_receiver_fn=serving_input_receiver_fn,
        exports_to_keep=10)

    eval_spec = tf.estimator.EvalSpec(
        input_fn=lambda: uds.get_tf_dataset_text(feature_label, evaluation_data, word_index_mapping, pad_value,
                                                 max_document_length,
                                                 target_type).repeat(count=None).shuffle(100).batch(int(100)),
        steps=10,
        exporters=latest_exporter)

    return est_model, train_spec, eval_spec, target_type, serving_input_receiver_fn


def main(args):
    # Get required training parameters
    model_id = args['model']
    virtual_dataset_id = args['vds']
    asset_id = args['asset']
    tid = args['train_id']

    # Get asset/embedding
    obj, _ = asset.read({"uid": asset_id})

    max_document_length = args['parameters']['max_document_length']

    # Classification_mode should be "binary"
    classification_mode = args['parameters']['classification_mode']

    training_steps = args['parameters']['steps']
    learning_rate = args['parameters']['lr']
    evaluation_dataset_percent = args['parameters']['test_percent']
    testing_dataset_percent = args['parameters']['eval_percent']

    # Get required params for model management and logging
    log_dir = str(args['exportdir'])
    graph_version = args['graphversion']

    try:
        train.status(tid, 0)

        # Prepare vds data for modeling
        delayed_values_training, delayed_values_evaluation, delayed_values_testing, testing_labels, _ = \
            uds.train_eval_test_split_tensorflow(virtual_dataset_id, evaluation_dataset_percent,
                                                 testing_dataset_percent, process_data)

        # Get asset/embedding
        embeddings_index, embedding_matrix, embedding_size, word_index_mapping, pad_value = uds.get_asset(asset_id)

        log.info(word_index_mapping)
    except Exception as exception:
        train.status(tid, 5, exception)
        log.debug(f"Caught exception: {exception}")
        raise

    try:
        _estimator, train_spec, eval_spec, target_type, serving_input_fn = \
            model(delayed_values_training, delayed_values_evaluation, training_steps, ".", embedding_matrix,
                  embedding_size, word_index_mapping, max_document_length, pad_value)
    except Exception as exception:
        log.debug(f"Caught Exception (starting user model): {exception}")
        train.status(tid, 5, "Training failed: problem processing user model")
        raise

    try:
        train.status(tid, 1)

        _tuple = tf.estimator.train_and_evaluate(_estimator, train_spec=train_spec, eval_spec=eval_spec)
        performance_tuple = _tuple[0]

        perf_list = []
        for k, v in performance_tuple.items():
            perf_list.append(k + ':' + str(v))

        train.status(tid, 2)

        # Compute confusion matrix
        predictions = lml.get_predictions_classification(_estimator,
                                                         lambda: uds.get_tf_dataset_text(feature_label,
                                                                                         delayed_values_testing,
                                                                                         word_index_mapping, pad_value,
                                                                                         max_document_length,
                                                                                         target_type).batch(1),
                                                         classification_mode, .5)

        actual = (testing_labels.compute()).iloc[:, 0].tolist()
        for index, item in enumerate(actual):
            if item == 'negative':
                actual[index] = 0
            else:
                actual[index] = 1

        label_mapping = _label_mapping()

        # Compute confusion matrix
        cm_string = lml.confusion_matrix(predictions, actual, label_mapping)

        # Compute average precision, recall, and f1 score for multi-class (not multi-label)
        actual_np = np.array(actual)
        predictions_np = np.array(predictions)
        precision_macro, recall_macro, f1_macro, _ = precision_recall_fscore_support(actual_np, predictions_np,
                                                                                     average='macro')
        precision_micro, recall_micro, f1_micro, _ = precision_recall_fscore_support(actual_np, predictions_np,
                                                                                     average='micro')
        precision_weighted, recall_weighted, f1_weighted, _ = precision_recall_fscore_support(actual_np, predictions_np,
                                                                                              average='weighted')

        # Compute precision, recall, and f1 score per label
        labels = list(set(actual))
        results = precision_recall_fscore_support(actual_np, predictions_np, average=None, labels=labels)
        results_string = ''
        for i in range(0, len(labels)):
            stat_list = []
            for stat in results[i]:
                stat_list.append(str(stat))
            stat_string = ','.join(stat_list)
            raw_label = labels[i]
            results_string += (label_mapping[raw_label] + '-')
            results_string += (stat_string + ';')

        log.info("zipping model up...")
        model_filename = uds.zip_tf_model(_estimator, serving_input_fn, model_id, graph_version, log_dir)
        log.debug("zipped model " + str(model_filename))

        # Store model graph and performance stats back to Lucd back-end
        with open(model_filename, "rb") as graph_file:
            accuracy = float(performance_tuple['accuracy'])
            final_loss = float(performance_tuple['loss'])
            train.update({tid: {
                'performance': {
                    'loss': final_loss,
                    'accuracy': accuracy,
                    'confusion_matrix': cm_string,
                    'macro_precision': precision_macro,
                    'macro_recall': recall_macro,
                    'macro_f1': f1_macro,
                    'micro_precision': precision_micro,
                    'micro_recall': recall_micro,
                    'micro_f1': f1_micro,
                    'weighted_precision': precision_weighted,
                    'weighted_recall': recall_weighted,
                    'weighted_f1': f1_weighted,
                    'precision_recall_f1_per_label': results_string,
                },
                'graph_version': graph_version,
                'graph_file': graph_file.read(),
                'ordered_feature_names': 'embedding_input,',
                'ordered_class_names': 'negative, positive,'
            }})

            log.debug("storing graph")
            log.debug("model id: " + str(model_id))

        train.status(tid, 3)

        # Store checkpoint files
        log.info("attempting checkpoint")
        uds.store_checkpoint(model_id, graph_version, virtual_dataset_id, log_dir)

        train.status(tid, 4)
    except Exception as exception:
        log.info(f"Caught exception: {exception}")
        train.status(tid, 5, "failure: error in training -- " + str(exception))
        raise

Text Classification with Multiple (Greater than 2) Classes

from eda import log
from eda.lib import lucd_uds as uds
from eda.lib import lucd_ml as lml
from eda.int import asset
from eda.int import train

from tensorflow import keras
import tensorflow as tf

import numpy as np
from sklearn.metrics import precision_recall_fscore_support

feature_label = "embedding_input"


def process_data(features, labels):
    # Can be passed to lucd_uds.train_eval_test_split_tensorflow for feature transformation, but will be
    # deprecated soon

    return features, labels


def _label_mapping():
    return {0: '0', 1: '1', 2: '2', 3: '3', 4: '4', 5: '5'}


def model(training_data, evaluation_data, training_steps, log_dir, embedding_matrix, embedding_size, word_index_mapping,
          max_document_length, pad_value):
    target_type = tf.int32

    # Define the feature columns for inputs.
    feature_columns = [
        tf.feature_column.numeric_column(feature_label, shape=[max_document_length])
    ]

    serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
        tf.feature_column.make_parse_example_spec(feature_columns))

    keras_model = keras.Sequential()
    keras_model.add(keras.layers.Embedding(len(embedding_matrix), embedding_size, weights=[embedding_matrix],
                                           trainable=False))
    keras_model.add(keras.layers.GRU(units=32, dropout=0.2, recurrent_dropout=0.2))
    keras_model.add(keras.layers.Dense(6, activation=tf.nn.softmax))

    keras_model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

    training_config = tf.estimator.RunConfig(
        save_summary_steps=10,
        save_checkpoints_steps=10)

    est_model = tf.keras.estimator.model_to_estimator(keras_model=keras_model, model_dir=log_dir,
                                                      config=training_config)

    train_spec = tf.estimator.TrainSpec(
        input_fn=lambda: uds.get_tf_dataset_text(feature_label, training_data, word_index_mapping, pad_value,
                                                 max_document_length,
                                                 target_type, 6).repeat(count=None).shuffle(100).batch(int(100)),
        max_steps=training_steps)

    latest_exporter = tf.estimator.LatestExporter(
        name="models",
        serving_input_receiver_fn=serving_input_receiver_fn,
        exports_to_keep=10)

    eval_spec = tf.estimator.EvalSpec(
        input_fn=lambda: uds.get_tf_dataset_text(feature_label, evaluation_data, word_index_mapping, pad_value,
                                                 max_document_length,
                                                 target_type, 6).repeat(count=None).shuffle(100).batch(int(100)),
        steps=10,
        exporters=latest_exporter)

    class_names = ['0', '1', '2', '4', '5']

    return est_model, train_spec, eval_spec, target_type, class_names, serving_input_receiver_fn


def main(args):
    # Get required training parameters
    model_id = args['model']
    virtual_dataset_id = args['vds']
    asset_id = args['asset']
    tid = args['train_id']

    # Get asset/embedding
    obj, _ = asset.read({"uid": asset_id})

    max_document_length = args['parameters']['max_document_length']
    classification_mode = args['parameters']['classification_mode']

    training_steps = args['parameters']['steps']
    learning_rate = args['parameters']['lr']
    evaluation_dataset_percent = args['parameters']['test_percent']
    testing_dataset_percent = args['parameters']['eval_percent']

    # Get required params for model management and logging
    log_dir = str(args['exportdir'])
    graph_version = args['graphversion']

    try:
        train.status(tid, 0)

        # Prepare vds data for modeling
        delayed_values_training, delayed_values_evaluation, delayed_values_testing, testing_labels, _ = \
            uds.train_eval_test_split_tensorflow(virtual_dataset_id, evaluation_dataset_percent,
                                                 testing_dataset_percent, process_data)

        # Get asset/embedding
        embeddings_index, embedding_matrix, embedding_size, word_index_mapping, pad_value = uds.get_asset(asset_id)

        log.info(word_index_mapping)
    except Exception as exception:
        train.status(tid, 5, exception)
        log.debug(f"Caught exception: {exception}")
        raise

    try:
        _estimator, train_spec, eval_spec, target_type, class_names, serving_input_fn = \
            model(delayed_values_training, delayed_values_evaluation, training_steps, ".", embedding_matrix,
                  embedding_size, word_index_mapping, max_document_length, pad_value)
    except Exception as exception:
        log.debug(f"Caught Exception (starting user model): {exception}")
        train.status(tid, 5, "Training failed: problem processing user model")
        raise

    try:
        train.status(tid, 1)

        _tuple = tf.estimator.train_and_evaluate(_estimator, train_spec=train_spec, eval_spec=eval_spec)
        performance_tuple = _tuple[0]

        perf_list = []
        for k, v in performance_tuple.items():
            perf_list.append(k + ':' + str(v))

        train.status(tid, 2)

        # Compute confusion matrix
        predictions = lml.get_predictions_classification(_estimator,
                                                         lambda: uds.get_tf_dataset_text(feature_label,
                                                                                         delayed_values_testing,
                                                                                         word_index_mapping, pad_value,
                                                                                         max_document_length,
                                                                                         target_type, 6).batch(1),
                                                         classification_mode, .5)

        actual = (testing_labels.compute()).iloc[:, 0].tolist()

        log.debug("\n\tLABEL MAPPING\n")
        label_mapping = _label_mapping()

        # Compute confusion matrix
        cm_string = lml.confusion_matrix(predictions, actual, label_mapping)

        # Compute average precision, recall, and f1 score for multi-class (not multi-label)
        actual_np = np.array(actual)
        predictions_np = np.array(predictions)
        precision_macro, recall_macro, f1_macro, _ = precision_recall_fscore_support(actual_np, predictions_np,
                                                                                     average='macro')
        precision_micro, recall_micro, f1_micro, _ = precision_recall_fscore_support(actual_np, predictions_np,
                                                                                     average='micro')
        precision_weighted, recall_weighted, f1_weighted, _ = precision_recall_fscore_support(actual_np, predictions_np,
                                                                                              average='weighted')

        # Compute precision, recall, and f1 score per label
        labels = list(set(actual))
        results = precision_recall_fscore_support(actual_np, predictions_np, average=None, labels=labels)
        results_string = ''
        for i in range(0, len(labels)):
            stat_list = []
            for stat in results[i]:
                stat_list.append(str(stat))
            stat_string = ','.join(stat_list)
            raw_label = labels[i]
            results_string += (label_mapping[raw_label] + '-')
            results_string += (stat_string + ';')

        log.info("zipping model up...")
        model_filename = uds.zip_tf_model(_estimator, serving_input_fn, model_id, graph_version, log_dir)
        log.debug("zipped model " + str(model_filename))

        class_names_string = ''
        for c in class_names:
            class_names_string = class_names_string + c + ','

        # Store model graph and performance stats back to Lucd back-end
        with open(model_filename, "rb") as graph_file:
            accuracy = float(performance_tuple['accuracy'])
            final_loss = float(performance_tuple['loss'])
            train.update({tid: {
                'performance': {
                    'loss': final_loss,
                    'accuracy': accuracy,
                    'confusion_matrix': cm_string,
                    'macro_precision': precision_macro,
                    'macro_recall': recall_macro,
                    'macro_f1': f1_macro,
                    'micro_precision': precision_micro,
                    'micro_recall': recall_micro,
                    'micro_f1': f1_micro,
                    'weighted_precision': precision_weighted,
                    'weighted_recall': recall_weighted,
                    'weighted_f1': f1_weighted,
                    'precision_recall_f1_per_label': results_string,
                },
                'graph_version': graph_version,
                'graph_file': graph_file.read(),
                'ordered_feature_names': 'embedding_input,',
                'ordered_class_names': class_names_string
            }})

            log.debug("storing graph")
            log.debug("model id: " + str(model_id))

        train.status(tid, 3)

        # Store checkpoint files
        log.info("attempting checkpoint")
        uds.store_checkpoint(model_id, graph_version, virtual_dataset_id, log_dir)

        train.status(tid, 4)
    except Exception as exception:
        log.info(f"Caught exception: {exception}")
        train.status(tid, 5, "failure: error in training -- " + str(exception))
        raise

Image Classification

This image classification example expects (28, 28, 1) images, which is the format of Fashion MNIST (https://github.com/zalandoresearch/fashion-mnist). For a multi-feature VDS one will have to overwrite the num_features variable with a dict specifying the shape of each feature after receiving it from train_eval_test_split_tensorflow and before passing it to the model function. You will also need to add a receiver tensor & receiver feature, + use keras functional instead of keras sequential (or a canned estimator supporting multiple features).

from eda import log
from eda.lib import lucd_uds as uds
from eda.int import train
from eda.lib import lucd_ml as ml
from eda.lib import lucd_img
import traceback
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras.layers import Dense, Conv2D, AveragePooling2D, Flatten, InputLayer
import tensorflow.keras.losses as losses
import numpy as np
import pprint
import os
from sklearn.metrics import precision_recall_fscore_support


def process_data(features, labels):
    """
    Method called by the main Lucd TensorFlow training workflow for executing user-defined data processing routines,
    e.g., determine average sentence length, vocab dictionaries, etc.
    :param features: Pandas dataframe containing features.
    :param labels: Pandas dataframe containing labels.
    :return: Dataframes for processed features and labels.
    """

    # Note that in the Lucd eda section of the GUI, replacements can be done, rendering this operation unnecessary
    return features, labels


def _label_mapping():

    return {0: '0', 1: '1', 2: '2', 3: '3', 4: '4', 5: '5', 6: '6', 7: '7', 8: '8', 9: '9'}


def serving_input_receiver_fn():

    receiver_tensors = {
        'image.binary': tf.compat.v1.placeholder(dtype=tf.float32, shape=[None, 28, 28, 1])
    }

    receiver_features = {
        'image.binary': tf.concat([
            receiver_tensors['image.binary']
        ], axis=1)
    }

    return tf.estimator.export.ServingInputReceiver(receiver_tensors=receiver_tensors, features=receiver_features)


def model(num_features, training_steps, learning_rate, log_dir, training_data, evaluation_data, tid):

    num_classes = 1
    target_type = tf.int32
    type_dict = {"image.binary": tf.float64}

    # Create the Estimator
    training_config = tf.estimator.RunConfig(
        save_summary_steps=10,
        save_checkpoints_steps=10)

    train_spec = tf.estimator.TrainSpec(
        input_fn=lambda: uds.get_tf_dataset_image(type_dict, training_data, num_features, target_type,
                                                  num_classes).repeat(count=None).shuffle(30).batch(int(30)),
        max_steps=training_steps)

    latest_exporter = tf.estimator.LatestExporter(
        name="models",
        serving_input_receiver_fn=serving_input_receiver_fn,
        exports_to_keep=10)

    eval_spec = tf.estimator.EvalSpec(
        input_fn=lambda: uds.get_tf_dataset_image(type_dict, evaluation_data, num_features, target_type,
                                                  num_classes).repeat(count=None).shuffle(30).batch(int(30)),
        exporters=latest_exporter,
        steps=1,
        throttle_secs=0
    )

    # LeNet-5 in keras with 10 classes
    keras_model = keras.Sequential()
    keras_model.add(InputLayer(input_shape=(28, 28, 1), name='image.binary'))
    keras_model.add(Conv2D(filters=6, kernel_size=(3, 3), activation='relu'))  # , input_shape=(28, 28, 1)))
    keras_model.add(AveragePooling2D())
    keras_model.add(Conv2D(filters=16, kernel_size=(3, 3), activation='relu'))
    keras_model.add(AveragePooling2D())
    keras_model.add(Flatten())
    keras_model.add(Dense(units=120, activation='relu'))
    keras_model.add(Dense(units=84, activation='relu'))
    keras_model.add(Dense(units=10, activation='softmax'))

    keras_model.compile(optimizer=tf.keras.optimizers.Adam(lr=learning_rate),
                        loss="sparse_categorical_crossentropy",
                        metrics=['accuracy'])

    estimator = tf.keras.estimator.model_to_estimator(keras_model, model_dir=log_dir, config=training_config)

    return estimator, train_spec, eval_spec, target_type, type_dict


def main(args):

    # Get required training parameters
    tid = args['train_id']
    model_id = args['model']
    virtual_dataset_id = args['vds']
    learning_rate = args['parameters']['lr']
    evaluation_dataset_percent = args['parameters']['eval_percent']
    testing_dataset_percent = args['parameters']['test_percent']
    training_steps = args['parameters']['steps']

    if testing_dataset_percent <= 0 or evaluation_dataset_percent <= 0 or (
            testing_dataset_percent + evaluation_dataset_percent >= 1.0):
        train.status(tid, 5, "testing_data_percent and/or evaluation_data_percent was set incorrectly")
        log.debug(f"Model aborted: test data percent: {testing_dataset_percent}")
        return

    # Get required params for model management and logging
    log_dir = str(args['exportdir'])
    graph_version = args['graphversion']
    mode = args['parameters']['classification_mode']

    # This needs to be a 1
    num_classes = 1

    try:
        train.status(tid, 0)

        # Prepare vds data for modeling
        delayed_values_training, delayed_values_evaluation, \
            delayed_values_testing, my_df_testing_label, num_features = \
            uds.train_eval_test_split_tensorflow(virtual_dataset_id, evaluation_dataset_percent,
                                                 testing_dataset_percent, process_data)

        _estimator, train_spec, eval_spec, target_type, type_dict = \
            model(num_features, training_steps, learning_rate, log_dir, delayed_values_training,
                  delayed_values_evaluation, tid)

    except Exception as exception:
        error = traceback.format_exc()
        train.status(tid, 5, f'{exception} : {error}')
        log.debug(f"Caught exception: {exception}, {error}")
        raise

    try:
        train.status(tid, 1)

        # This is keras estimator changes for keras model
        _tuple = tf.estimator.train_and_evaluate(_estimator, train_spec=train_spec, eval_spec=eval_spec)

        # This is empty and throws an error if we run it with a checkpoint in place
        performance_tuple = _tuple[0]

        train.status(tid, 2)

        # compute confusion matrix
        predictions = ml.get_predictions_classification(_estimator,
                                                        lambda: uds.get_tf_dataset_image(type_dict,
                                                                                         delayed_values_testing,
                                                                                         num_features,
                                                                                         target_type,
                                                                                         num_classes,).batch(1),
                                                        mode)

        print(predictions)

        actual = (my_df_testing_label.compute()).iloc[:, 0].tolist()

        for index, item in enumerate(actual):
            actual[index] = int(item)

        label_mapping = _label_mapping()

        # Compute confusion matrix
        cm_string = ml.confusion_matrix(predictions, actual, label_mapping)

        # compute average precision, recall, and f1 score for multi-class (not multi-label)
        actual_np = np.array(actual)
        predictions_np = np.array(predictions)
        precision_macro, recall_macro, f1_macro, _ = precision_recall_fscore_support(actual_np, predictions_np,
                                                                                     average='macro')
        precision_micro, recall_micro, f1_micro, _ = precision_recall_fscore_support(actual_np, predictions_np,
                                                                                     average='micro')
        precision_weighted, recall_weighted, f1_weighted, _ = precision_recall_fscore_support(actual_np, predictions_np,
                                                                                              average='weighted')

        # compute precision, recall, and f1 score per label
        labels = list(set(actual))
        results = precision_recall_fscore_support(actual_np, predictions_np, average=None, labels=labels)

        results_string = ''
        for i in range(0, len(labels)):
            stat_list = []
            for metric in results:
                stat_list.append(str(metric[i]))

            stat_string = ','.join(stat_list)
            raw_label = labels[i]
            results_string += (label_mapping[raw_label] + '-')
            results_string += (stat_string + ';')

        log.info("zipping model up...")
        model_filename = uds.zip_tf_model(_estimator, serving_input_receiver_fn, model_id, graph_version, log_dir)
        log.debug("zipped model " + str(model_filename))

        print("performance tuple: ")
        print(performance_tuple)

        # Store model graph and performance stats back to Lucd back-end
        with open(model_filename, "rb") as graph_file:
            accuracy = float(performance_tuple['accuracy'])
            final_loss = float(performance_tuple['loss'])
            import json
            train.update({tid: {
                'performance': {
                    'loss': final_loss,
                    'accuracy': accuracy,
                    'confusion_matrix': cm_string,
                    'macro_precision': precision_macro,
                    'macro_recall': recall_macro,
                    'macro_f1': f1_macro,
                    'micro_precision': precision_micro,
                    'micro_recall': recall_micro,
                    'micro_f1': f1_micro,
                    'weighted_precision': precision_weighted,
                    'weighted_recall': recall_weighted,
                    'weighted_f1': f1_weighted,
                    'precision_recall_f1_per_label': results_string,
                },
                'graph_version': graph_version,
                'graph_file': graph_file.read()
            }})

            log.debug("storing graph")
            log.debug("model id: " + str(model_id))

        train.status(tid, 3)

        # Store checkpoint files
        log.info("attempting checkpoint")
        uds.store_checkpoint(model_id, graph_version, virtual_dataset_id, log_dir)
        train.status(tid, 4)

    except Exception as exception:
        error = traceback.format_exc()
        log.debug(f"Caught exception: {exception}, {error}")
        train.status(tid, 5, "failure: error in training -- " + str(exception) + " : " + str(error))
        raise

Simple Regression

from eda.int import model
from eda import log
from eda.lib import lucd_uds as uds
from eda.lib import lucd_ml as lml
from eda.int import train

import tensorflow as tf
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score


def process_data(features, labels):
    # Can be passed to lucd_uds.train_eval_test_split_tensorflow for feature transformation, but will be
    # deprecated soon

    return features, labels


def model(num_features, training_steps, learning_rate, log_dir, training_data, evaluation_data):
    feature_dict = {"x": tf.float32}
    target_type = tf.int32

    # Define the feature columns for inputs.
    feature_columns = [
        tf.feature_column.numeric_column("x", shape=num_features)
    ]

    serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
        tf.feature_column.make_parse_example_spec(feature_columns))

    # Create the Estimator
    training_config = tf.estimator.RunConfig(
        save_summary_steps=10,
        save_checkpoints_steps=10)

    classifier = tf.estimator.LinearRegressor(config=training_config,
                                              feature_columns=feature_columns,
                                              model_dir=log_dir)

    train_spec = tf.estimator.TrainSpec(
        input_fn=lambda: uds.get_tf_dataset(feature_dict, training_data, num_features,
                                            target_type).repeat(count=None).shuffle(100).batch(100),
        max_steps=training_steps)

    latest_exporter = tf.estimator.LatestExporter(
        name="models",
        serving_input_receiver_fn=serving_input_receiver_fn,
        exports_to_keep=10)

    eval_spec = tf.estimator.EvalSpec(
        input_fn=lambda: uds.get_tf_dataset(feature_dict, evaluation_data, num_features,
                                            target_type).repeat(count=None).shuffle(100).batch(100),
        exporters=latest_exporter
    )

    return classifier, train_spec, eval_spec, ["x"], feature_dict, target_type, serving_input_receiver_fn


def main(args):
    # Get required training parameters
    tid = args['train_id']
    model_id = args['model']
    virtual_dataset_id = args['vds']

    learning_rate = args['parameters']['lr']
    evaluation_dataset_percent = args['parameters']['eval_percent']
    testing_dataset_percent = args['parameters']['test_percent']
    training_steps = args['parameters']['steps']

    if testing_dataset_percent <= 0 or evaluation_dataset_percent <= 0 or (
            testing_dataset_percent + evaluation_dataset_percent >= 1.0):
        train.status(tid, 5, "testing_data_percent and/or evaluation_data_percent was set correctly set")
        log.debug(f"Model aborted: test data percent: {testing_dataset_percent}")
        return

    # Get required params for model management and logging
    log_dir = str(args['exportdir'])
    graph_version = args['graphversion']

    classification_mode = args['parameters']['classification_mode']

    try:
        train.status(tid, 0)

        # Prepare vds data for modeling
        delayed_values_training, delayed_values_evaluation, delayed_values_testing, my_df_testing_label, num_features = \
            uds.train_eval_test_split_tensorflow(virtual_dataset_id, evaluation_dataset_percent,
                                                 testing_dataset_percent, process_data)

        _estimator, train_spec, eval_spec, feature_labels, feature_dict, target_type, serving_input_receiver_fn = \
            model(num_features, training_steps, learning_rate, log_dir, delayed_values_training,
                  delayed_values_evaluation)
    except Exception as exception:
        train.status(tid, 5, exception)
        log.debug(f"Caught exception: {exception}")
        raise

    try:
        train.status(tid, 1)

        _tuple = tf.estimator.train_and_evaluate(_estimator, train_spec=train_spec, eval_spec=eval_spec)
        performance_tuple = _tuple[0]

        perf_list = []
        for k, v in performance_tuple.items():
            perf_list.append(k + ':' + str(v))

        train.status(tid, 2)

        predictions = lml.get_predictions_regression(_estimator,
                                                     lambda: uds.get_tf_dataset(feature_dict,
                                                                                delayed_values_testing,
                                                                                num_features,
                                                                                target_type).batch(1)
                                                     )

        actual = (my_df_testing_label.compute()).iloc[:, 0].tolist()

        # compute root mean squared error and mean absolute error
        rmse = np.sqrt(mean_squared_error(actual, predictions)) ** 0.5
        mae = mean_absolute_error(actual, predictions)
        r2 = r2_score(actual, predictions)

        log.info("zipping model up...")
        model_filename = uds.zip_tf_model(_estimator, serving_input_receiver_fn, model_id, graph_version, log_dir)
        log.debug("zipped model " + str(model_filename))

        # Store model graph and performance stats back to Lucd back-end
        with open(model_filename, "rb") as graph_file:
            train.update({tid: {
                'performance': {
                    'rmse': rmse,
                    'mae': mae,
                    'r2': r2
                },
                'graph_version': graph_version,
                'graph_filename': 'model.zip',
                'graph_file': graph_file.read(),
            }})
            log.debug("storing graph")
            log.debug("model id: " + str(model_id))

        train.status(tid, 3)

        # Store checkpoint files
        log.info("attempting checkpoint")
        uds.store_checkpoint(model_id, graph_version, virtual_dataset_id, log_dir)

        train.status(tid, 4)
    except Exception as exception:
        log.info(f"Caught exception: {exception}")
        train.status(tid, 5, "failure: error in training -- " + str(exception))
        raise

PyTorch

Simple Classification

from eda.lib import lucd_uds
from eda import log

from torch.utils.data import DataLoader
import torch
import torch.nn as nn
import torch.nn.functional as F

from eda.int import train

torch.manual_seed(1234)


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(4, 10)
        self.fc2 = nn.Linear(10, 3)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)

        return x


def process_data(features, targets):
    # Can be passed to lucd_uds.train_eval_test_split_pytorch for feature transformation, but will be
    # deprecated soon

    return features, targets


def main(args):
    # Get required training parameters
    tid = args['train_id']
    model_id = args['model']
    virtual_dataset_id = args['vds']

    learning_rate = args['parameters']['lr']
    evaluation_dataset_percent = args['parameters']['eval_percent']
    testing_dataset_percent = args['parameters']['test_percent']
    training_steps = args['parameters']['steps']

    # Get required params for model management and logging
    log_dir = str(args['exportdir'])
    graph_version = args['graphversion']

    try:
        train.status(tid, 0)

        train_df, eval_df, test_df = lucd_uds.train_eval_test_split_pytorch(virtual_dataset_id,
                                                                            evaluation_dataset_percent,
                                                                            testing_dataset_percent, 'Species',
                                                                            'classification', process_data)

        # Create iterator objects for train and valid datasets
        trainloader = DataLoader(train_df, batch_size=10)
        validloader = DataLoader(train_df, batch_size=10)
    except Exception as exception:
        train.status(tid, 5, exception)
        log.debug(f"Caught exception: {exception}")
        raise

    try:
        train.status(tid, 1)

        # choose optimizer and loss function
        net = Net()
        criterion = nn.CrossEntropyLoss()
        optimizer = torch.optim.SGD(net.parameters(), lr=learning_rate)

        # train
        final_loss = 0.0
        for epoch in range(training_steps):
            train_loss, valid_loss = [], []
            tl = 0
            for data in trainloader:
                net.train()
                optimizer.zero_grad()
                out = net(data['Species'])
                loss = criterion(out, data['target'])
                loss.backward()
                optimizer.step()
                train_loss.append(loss.item())
                tl += loss.item()

                if epoch % 50 == 0:
                    final_loss = tl / 50

        correct = 0
        total = 0
        with torch.no_grad():
            for data in validloader:
                outputs = net(data['Species'])
                _, predicted = torch.max(outputs.data, 1)
                total += data['target'].size(0)
                correct += (predicted == data['target']).sum().item()

        accuracy = 100 * correct / total

        log.info(accuracy)
        log.info(final_loss)

        # Save model
        torch.save(net.state_dict(), log_dir + '/iris_model.pt')

        # Store model graph and performance stats back to Lucd back-end
        with open(log_dir + '/iris_model.pt', "rb") as graph_file:
            log.debug("storing graph")
            log.debug("model id: " + str(model_id))

            train.update({tid: {
                'performance': {
                    'loss': final_loss,
                    'accuracy': accuracy
                },
                'graph_version': graph_version,
                'graph_file': graph_file.read()
            }})

        train.status(tid, 3)
    except Exception as exception:
        log.info(f"Caught exception: {exception}")
        train.status(tid, 5, "failure: error in training -- " + str(exception))
        raise

Text Classification

from eda.lib import lucd_uds
from eda import log

from torch.utils.data import DataLoader
import torch
import torch.nn as nn

from eda.int import train

import time

torch.manual_seed(1234)


class TextSentiment(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_class):
        super().__init__()
        self.embedding = nn.EmbeddingBag(vocab_size, embed_dim, sparse=True)
        self.fc = nn.Linear(embed_dim, num_class)
        self.init_weights()

    def init_weights(self):
        initrange = 0.5
        self.embedding.weight.data.uniform_(-initrange, initrange)
        self.fc.weight.data.uniform_(-initrange, initrange)
        self.fc.bias.data.zero_()

    def forward(self, text, offsets):
        embedded = self.embedding(text, offsets)

        return self.fc(embedded)


def train_func(sub_train_, batch_size, model):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    criterion = torch.nn.CrossEntropyLoss().to(device)
    optimizer = torch.optim.SGD(model.parameters(), lr=4.0)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1, gamma=0.9)

    # Train the model
    train_loss = 0
    train_acc = 0
    data = DataLoader(sub_train_, batch_size=batch_size, shuffle=True, collate_fn=lucd_uds.generate_pytorch_text_batch)

    for i, (text, offsets, cls) in enumerate(data):
        optimizer.zero_grad()
        text, offsets, cls = text.to(device), offsets.to(device), cls.to(device)

        output = model(text, offsets)

        loss = criterion(output, cls)
        train_loss += loss.item()
        loss.backward()
        optimizer.step()
        train_acc += (output.argmax(1) == cls).sum().item()

    # Adjust the learning rate
    scheduler.step()

    return train_loss / len(sub_train_), train_acc / len(sub_train_)


def test(data_, batch_size, model):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    criterion = torch.nn.CrossEntropyLoss().to(device)

    loss = 0
    acc = 0
    data = DataLoader(data_, batch_size=batch_size, collate_fn=lucd_uds.generate_pytorch_text_batch)
    for text, offsets, cls in data:
        text, offsets, cls = text.to(device), offsets.to(device), cls.to(device)
        with torch.no_grad():
            output = model(text, offsets)
            loss = criterion(output, cls)
            loss += loss.item()
            acc += (output.argmax(1) == cls).sum().item()

    return loss / len(data_), acc / len(data_)


def process_data(features, targets):
    # Can be passed to lucd_uds.train_eval_test_split_pytorch for feature transformation, but will be
    # deprecated soon

    targets = targets.replace(["negative", "positive"], [0, 1])

    return features, targets


def main(args):
    # Get required training parameters
    tid = args['train_id']
    model_id = args['model']
    virtual_dataset_id = args['vds']
    asset_id = args['asset']

    learning_rate = args['parameters']['lr']
    evaluation_dataset_percent = args['parameters']['eval_percent']
    testing_dataset_percent = args['parameters']['test_percent']
    training_steps = args['parameters']['steps']

    # Get required params for model management and logging
    log_dir = str(args['exportdir'])
    graph_version = args['graphversion']

    batch_size = 200
    num_classes = 2

    try:
        train.status(tid, 0)

        # Get asset/embedding
        embeddings_index, embedding_matrix, embedding_size, word_index_mapping, pad_value = lucd_uds.get_asset(asset_id)

        train_df, eval_df, test_df = lucd_uds.train_eval_test_split_pytorch(virtual_dataset_id,
                                                                            evaluation_dataset_percent,
                                                                            testing_dataset_percent, None,
                                                                            None, process_data,
                                                                            text_data=True,
                                                                            word_index_mapping=word_index_mapping)

        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        model = TextSentiment(len(word_index_mapping), embedding_size, num_classes).to(device)
    except Exception as exception:
        train.status(tid, 5, exception)
        log.debug(f"Caught exception: {exception}")
        raise

    try:
        train.status(tid, 1)

        for epoch in range(training_steps):
            start_time = time.time()
            train_loss, train_acc = train_func(train_df, batch_size, model)
            valid_loss, valid_acc = test(eval_df, batch_size, model)

            secs = int(time.time() - start_time)
            mins = secs / 60
            secs = secs % 60

            print('Epoch: %d' % (epoch + 1), " | time in %d minutes, %d seconds" % (mins, secs))
            print(f'\tLoss: {train_loss:.4f}(train)\t|\tAcc: {train_acc * 100:.1f}%(train)')
            print(f'\tLoss: {valid_loss:.4f}(valid)\t|\tAcc: {valid_acc * 100:.1f}%(valid)')

        # Save and zip model
        model_filename = lucd_uds.zip_pt_model(model, model_id, log_dir + '/imdb_model.pt', graph_version)

        train.status(tid, 3)

        # Store model graph and performance stats back to Lucd back-end
        with open(model_filename, "rb") as graph_file:
            log.debug("storing graph")
            log.debug("model id: " + str(model_id))

            train.update({tid: {
                'performance': {
                    'training loss': train_loss,
                    'training accuracy': train_acc,
                    'validation loss': valid_loss.cpu().numpy(),
                    'validation accuracy': valid_acc
                },
                'graph_version': graph_version,
                'graph_file': graph_file.read()
            }})

        train.status(tid, 4)
    except Exception as exception:
        log.info(f"Caught exception: {exception}")
        train.status(tid, 5, "failure: error in training -- " + str(exception))
        raise

Simple Regression

from eda.lib import lucd_uds
from eda import log

from torch.utils.data import DataLoader
import torch
import torch.nn.functional as F

from eda.int import train

import numpy as np

torch.manual_seed(1234)


class Net(torch.nn.Module):
    def __init__(self, n_feature, n_hidden, n_output):
        super(Net, self).__init__()
        self.hidden = torch.nn.Linear(n_feature, n_hidden)  # hidden layer
        self.predict = torch.nn.Linear(n_hidden, n_output)  # output layer

    def forward(self, x):
        x = F.relu(self.hidden(x))  # activation function for hidden layer
        x = self.predict(x)  # linear output

        return x


def process_data(features, targets):
    # Can be passed to lucd_uds.train_eval_test_split_pytorch for feature transformation, but will be
    # deprecated soon

    return features, targets


def main(args):
    # Get required training parameters
    tid = args['train_id']
    model_id = args['model']
    virtual_dataset_id = args['vds']

    lr = args['parameters']['lr']
    evaluation_dataset_percent = args['parameters']['eval_percent']
    testing_dataset_percent = args['parameters']['test_percent']
    training_steps = args['parameters']['steps']

    # Get required params for model management and logging
    log_dir = str(args['exportdir'])
    graph_version = args['graphversion']

    try:
        train.status(tid, 0)

        train_df, eval_df, test_df = lucd_uds.train_eval_test_split_pytorch(virtual_dataset_id,
                                                                            evaluation_dataset_percent,
                                                                            testing_dataset_percent,
                                                                            'Features', 'regression', process_data)

        # Create iterator objects for train and valid datasets
        trainloader = DataLoader(train_df, batch_size=10)
        validloader = DataLoader(train_df, batch_size=10)
    except Exception as exception:
        train.status(tid, 5, exception)
        log.debug(f"Caught exception: {exception}")
        raise

    try:
        train.status(tid, 1)

        # Choose optimizer and loss function
        net = Net(n_feature=3, n_hidden=10, n_output=1)  # define the network
        optimizer = torch.optim.SGD(net.parameters(), lr=lr)
        loss_func = torch.nn.MSELoss()  # this is for regression mean squared loss

        # Train
        train_loss, valid_loss = [], []
        for epoch in range(training_steps):
            for data in trainloader:
                net.train()
                out = net(data['Features'])

                # This is for maintaining data shape consistency as it travels through the graph
                out = out[:, -1]

                loss = loss_func(out, data['target'])
                loss.backward()
                optimizer.step()
                optimizer.zero_grad()
                train_loss.append(loss.item())

            with torch.no_grad():
                for data in validloader:
                    net.eval()
                    out = net(data['Features'])

                    # This is for maintaining data shape consistency as it travels through the graph
                    out = out[:, -1]

                    loss = loss_func(out, data['target'])
                    valid_loss.append(loss.item())

        mean_loss = np.mean(valid_loss)

        # Save model
        torch.save(net.state_dict(), log_dir + '/iris_model.pt')

        # Store model graph and performance stats back to Lucd back-end
        with open(log_dir + '/iris_model.pt', "rb") as graph_file:
            log.debug("storing graph")
            log.debug("model id: " + str(model_id))

            train.update({tid: {
                'performance': {
                    'loss': mean_loss
                },
                'graph_version': graph_version,
                'graph_file': graph_file.read()
            }})

        train.status(tid, 3)
    except Exception as exception:
        log.info(f"Caught exception: {exception}")
        train.status(tid, 5, "failure: error in training -- " + str(exception))
        raise

PyTorch Image Classification

from eda.lib import lucd_uds
from torch.utils.data import DataLoader
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
import numpy as np
import json
from eda.int import train

torch.manual_seed(1234)

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()

        # define layers
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=6, kernel_size=5)
        self.conv2 = nn.Conv2d(in_channels=6, out_channels=12, kernel_size=5)

        self.fc1 = nn.Linear(in_features=12 * 4 * 4, out_features=120)
        self.fc2 = nn.Linear(in_features=120, out_features=60)
        self.out = nn.Linear(in_features=60, out_features=10)

    def forward(self, t):
        # conv 1
        t = self.conv1(t)
        t = F.relu(t)
        t = F.max_pool2d(t, kernel_size=2, stride=2)

        # conv 2
        t = self.conv2(t)
        t = F.relu(t)
        t = F.max_pool2d(t, kernel_size=2, stride=2)

        # fc1
        t = t.reshape(-1, 12 * 4 * 4)
        t = self.fc1(t)
        t = F.relu(t)

        # fc2
        t = self.fc2(t)
        t = F.relu(t)

        # output
        t = self.out(t)

        return t


def process_data(features, targets):

    # Use this code in case the target values are strings instead of integers
    return features, targets


def main(args):
    lr = args["parameters"]['lr']
    vds = args['vds']
    testing_dataset_percent = args['parameters']['test_percent']
    evaluation_dataset_percent = args['parameters']['eval_percent']
    save_path = args['exportdir']
    tid = args['train_id']
    graph_version = args['graphversion']
    training_steps = args['parameters']['steps']

    train_df, eval_df, test_df = lucd_uds.train_eval_test_split_pytorch(vds,
                                                                        evaluation_dataset_percent,
                                                                        testing_dataset_percent, 'Images',
                                                                        'classification', process_data)


    # Create iterator objects for train and valid datasets
    trainloader = DataLoader(train_df, batch_size=10)
    validloader = DataLoader(eval_df, batch_size=10)

    # choose optimizer and loss function
    net = Net()
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(net.parameters(), lr=lr)

    # train
    for epoch in range(training_steps):

        train_loss, valid_loss = [], []
        tl = 0

        for data in trainloader:

            net.train()
            optimizer.zero_grad()

            out = net(data['Images'])

            loss = criterion(out, data['target'])
            loss.backward()
            optimizer.step()
            train_loss.append(loss.item())
            tl += loss.item()

            if epoch % 50 == 0:
                final_loss = tl / 50

    correct = 0
    total = 0
    with torch.no_grad():
        for data in validloader:
            outputs = net(data['Images'])
            _, predicted = torch.max(outputs.data, 1)
            total += data['target'].size(0)
            correct += (predicted == data['target']).sum().item()

    accuracy = 100 * correct / total

    # Save model
    torch.save(net.state_dict(), save_path + '/fmnist_model.pt')

    # Store model graph and performance stats back to Lucd back-end
    with open(save_path + '/fmnist_model.pt', "rb") as graph_file:

        train.update({tid: {
            'performance': {
                'loss': final_loss,
                'accuracy': accuracy
            },
            'graph_version': graph_version,
            'graph_file': graph_file.read()
        }})

XGBoost

from eda.lib.dask import EdaDask
from eda import log
from eda.lib import lucd_uds as uds
from eda.int import train

from sklearn.metrics import roc_auc_score
import numpy as np

import dask_xgboost as dxgb


def main(args):
    virtual_dataset_id = args['vds']
    evaluation_dataset_percent = args['parameters']['eval_percent']
    save_path = args['exportdir']
    tid = args['train_id']
    graph_version = args['graphversion']
    training_steps = args['parameters']['steps']

    client = EdaDask().client()

    try:
        train.status(tid, 0)

        train_features, train_labels, eval_features, eval_labels, _, _ = \
            uds.train_eval_test_split_dataframe_2(virtual_dataset_id, evaluation_dataset_percent)
    except Exception as exception:
        train.status(tid, 5, exception)
        log.debug(f"Caught exception: {exception}")
        raise

    train_labels = train_labels.mask(train_labels['flower.species'] == 'I. versicolor', 0)
    train_labels = train_labels.mask(train_labels['flower.species'] == 'I. virginica', 1)
    train_labels = train_labels.mask(train_labels['flower.species'] == 'I. setosa', 1)

    eval_labels = eval_labels.mask(eval_labels['flower.species'] == 'I. versicolor', 0)
    eval_labels = eval_labels.mask(eval_labels['flower.species'] == 'I. virginica', 1)
    eval_labels = eval_labels.mask(eval_labels['flower.species'] == 'I. setosa', 1)

    train_features = train_features.astype(np.float)
    eval_features = eval_features.astype(np.float)
    train_labels = train_labels.astype(np.int)
    eval_labels = eval_labels.astype(np.int)

    params = {'objective': 'binary:logistic',
              'nround': training_steps,
              'max_depth': 4,
              'eta': 0.01,
              'subsample': 0.5,
              'min_child_weight': 0.5}


    try:
        train.status(tid, 1)

        log.info("starting training...")
        # train model
        bst = dxgb.train(client, params, train_features, train_labels)
        log.info("training done.")

        # run evaluation
        predictions = dxgb.predict(client, bst, eval_features).persist()

        # compute the accuracy
        ra_score = roc_auc_score(eval_labels.compute(), predictions.compute())

        log.info(ra_score)

        # Save model
        bst.save_model(save_path + '/model.xg')

        # Store model and performance stats back to Lucd back-end
        with open(save_path + '/model.xg', "rb") as graph_file:
            train.update({tid: {
                'performance': {
                    'roc_score': ra_score
                },
                'graph_version': graph_version,
                'graph_filename': 'model.xg',
                'graph_file': graph_file.read()
            }})

            train.status(tid, 3)
    except Exception as exception:
        train.update({tid: {'Caught exception': str(exception)}})
        train.status(tid, 5, "failure: error in training -- " + str(exception))
        raise

Comments