Skip to content

Tutorial: Data Science Tutorial Part 3 of 3

Background on Lucd

The Lucd Enterprise AI Data Science Platform is a highly secure, scalable, open and flexible platform for persisting an fusing large and numerous datasets and training AI models for production against those datasets. The Lucd platform is an end to end platform that can be deployed in public cloud environments, on premise on bare metal hardware, or the Lucd multi-tenant PaaS can be directly accessed. The platform consists of:

  • A scalable open data ingest capability
  • A petabyte scale unified data space data repository
  • 3-D Visualization and Exploration
  • An Exploratory Data Analysis Rest Service
  • A Kubernetes environment to train PyTorch and TensorFlow models
  • NLP Word Embedding and Explainable AI Assets
  • Model results visualization and exporting to internal or external serving capability

Introduction, Prerequisites

This tutorial demonstrates the steps required to train an AI model on data leveraging the Lucd Data Science Platform. The tutorial is a toy, leveraging the IRIS dataset, designed to show the basic steps to train a model. In the example a Virtual Data Set is created, A custom operation adds a categorical feature to the existing continuous features. Then a custom Pytorch model is developed and trained in the platform. Both the Lucd 3D UI and the Lucd Python Client are leveraged during the tutorial. The tutorial is brokein up into three Parts:

  1. Part 1: Creating a Virtual Data Set (VDS) https://github.com/jmstadt/Tutorials/blob/master/Lucd_Part_1_of_3_Data_Science_Tutorial.ipynb
  2. Part 2: Performing a Custom Operation during Exploratory Data Analysis https://github.com/jmstadt/Tutorials/blob/master/Lucd_Part_2_of_3_Data_Science_Tutorial.ipynb
  3. Part 3: Developing a Custom AI Model and Training in the Lucd Platform

Prerequisites are:

1. Complete Part 1 and Part 2 of this Lucd Data Science Tutorial

At the end of Part 2, we have a dataframe loaded into our Lucd Python Client. We are going to train a Pytorch Classification Model to predict Species types from the continuous variables (from the original dataset) and the categorical variable (that we created from the custom operation).

For details of training an AI model in the Lucd platform refer to: https://community.lucd.ai/hc/en-us/articles/360022454472-Lucd-Modeling-Framework-v6-1-3

For this notebook, we will start by reloading the VDS we created at the end of Part 2

In [1]:
import lucd
from eda.lib import lucd_uds

login to the Lucd Enterprie AI Platform with the same credentials you used in Part 2

In [ ]:
client = lucd.LucdClient(domain="<your domain>",
                         username="<your username>",
                         password="<your password>",
                         )

Retrieve the VDS ID and pull into a dask dataframe as per the end of Part 2

In [3]:
ddf = lucd_uds.get_dataframe("demo_9223370449329390901").reset_index(drop=True)
2020-04-22 14:32:00,439 | root | INFO | dask.py:28 | Creating Dask LocalCluster: http://localhost:60000/status
c:\users\markstadtmueller\anaconda3\lib\site-packages\distributed\dashboard\core.py:79: UserWarning: 
Port 60000 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.
  warnings.warn("\n" + msg)
{'demo_9223370449329390901': {'description': 'iris', 'model': {'data': ['flower.petal_length', 'flower.petal_width', 'flower.sepal_length', 'flower.sepal_width', 'flower.species', 'flower_mean', 'std.display', 'std.model', 'std.source', 'std.timestamp'], 'labels': ['flower.petal_length', 'flower.petal_width', 'flower.sepal_length', 'flower.sepal_width', 'flower.species', 'flower_mean', 'std.display', 'std.model', 'std.source', 'std.timestamp']}, 'name': 'april21_custom', 'operations': [{'command': 'custom', 'custom_operation_id': 'demo_9223370449329589395', 'dataset': '637231220528707971', 'orient': 'records', 'parameters': {'function_apply_method': 'map_partitions'}, 'return': '637231221401607846'}], 'query': {'dataset': '637231220528707971', 'query': {'bool': {'filter': [], 'must': [{'bool': {'should': [{'match_phrase': {'source': 'iris'}}]}}, {'range': {'content_date': {'gte': None, 'lt': None}}}], 'must_not': []}}, 'size': 100}, 'query_size': 150, 'username': 'demo'}}
()

You can call .head() on the new dataframe, there is now your VDS with the "flower_mean" column per your custom operation

2. This section encodes the categorical columns

In [8]:
from eda.lib import lucd_uds
from sklearn.metrics import precision_recall_fscore_support
import tensorflow as tf
import numpy as np
In [6]:
def process_data(features, labels):
    new_labels = labels.replace(["I. versicolor", "I. setosa", "I. virginica"], [2, 0, 1])
    features['flower_mean'] = features['flower_mean'].replace([False, True], ["False", "True"])
    return features, new_labels
In [7]:
def _label_mapping():
    return {2: 'I. setosa', 1: 'I. virginica', 0: 'I. versicolor'}

3. Define Model

In [9]:
def model(num_features, training_steps, learning_rate, log_dir, training_data, evaluation_data):
    type_dict = {"flower.petal_length": tf.float64,
                 "flower.petal_width": tf.float64,
                 "flower.sepal_length": tf.float64,
                 "flower.sepal_width": tf.float64,
                 "flower_mean": tf.string
                 }

    feature_labels = ["flower.petal_length", "flower.petal_width", "flower.sepal_length", "flower.sepal_width",
                      "flower_mean"]

    target_type = tf.int32

    num_classes = 3

    # Define the feature columns for inputs.
    feature_columns = [
        tf.feature_column.numeric_column(key="flower.petal_length"),
        tf.feature_column.numeric_column(key="flower.petal_width"),
        tf.feature_column.numeric_column(key="flower.sepal_length"),
        tf.feature_column.numeric_column(key="flower.sepal_width"),
        tf.feature_column.indicator_column(
            tf.feature_column.categorical_column_with_vocabulary_list(key='flower_mean',
                                                                      vocabulary_list=['True', 'False'])
        )
    ]

    serving_input_receiver_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
        tf.feature_column.make_parse_example_spec(feature_columns))

    # Create the Estimator
    training_config = tf.estimator.RunConfig(
        save_summary_steps=10,
        save_checkpoints_steps=10)

    classifier = tf.estimator.DNNClassifier(
        config=training_config,
        feature_columns=feature_columns,
        hidden_units=[10, 20, 10],
        n_classes=num_classes,
        model_dir=log_dir
    )

    train_spec = tf.estimator.TrainSpec(
        input_fn=lambda: lucd_uds.get_tf_dataset(feature_labels, type_dict, training_data, num_features,
                                                 target_type).repeat(count=None).shuffle(100).batch(100),
        max_steps=training_steps)

    latest_exporter = tf.estimator.LatestExporter(
        name="models",
        serving_input_receiver_fn=serving_input_receiver_fn,
        exports_to_keep=10)

    eval_spec = tf.estimator.EvalSpec(
        input_fn=lambda: lucd_uds.get_tf_dataset(feature_labels, type_dict, evaluation_data, num_features,
                                                 target_type).repeat(count=None).shuffle(100).batch(100),
        exporters=latest_exporter)

    return classifier, train_spec, eval_spec, feature_labels, type_dict, target_type

4. Get Predictions

In [10]:
def get_predictions_classification(_estimator, input_fn, mode):
    """ Returns a list of predicted values using a TensorFlow estimator on a given TensorFlow DataSet.

    Args:
        _estimator: TensorFlow estimator to use for getting predictions.
        input_fn: TensorFlow input function feeding DataSet with testing/hold-out data.
        mode: String indicating type of classification done by the estimator: "binary" or "multi."

    Returns:
        List of predictions.
    """

    predictions = _estimator.predict(input_fn=input_fn)

    return_list = []

    for pred in predictions:
        if mode == "binary":
            for key in pred:
                # log.debug(f"\n\n\nKEY: {key}")
                prob_value = pred[key][0]
                break

            if prob_value > .5:
                return_list.append(1)
            else:
                return_list.append(0)
        elif mode == "multiclass":
            for key in pred:
                prob_value = pred[key]
                break

            return_list.append(np.argmax(prob_value))
        elif mode == "tf_premade_multiclass":
            return_list.append(pred["class_ids"][0])

    return return_list

5. Define Main

In [11]:
def main(args):
    # Get required training parameters
    tid = args['train_id']
    vds = args['vds']

    lr = args['parameters']['lr']
    evaluation_dataset_percent = args['parameters']['eval_percent']
    testing_dataset_percent = args['parameters']['test_percent']
    training_steps = args['parameters']['steps']
    save_path = args['exportdir']

    delayed_values_training, delayed_values_evaluation, delayed_values_testing, my_df_testing_label, num_features =\
        lucd_uds.train_test_eval_split_tensorflow(vds,
                                                  evaluation_dataset_percent,
                                                  testing_dataset_percent,
                                                  process_data)

    print(f"Train: {len(delayed_values_training)}, Test: {len(delayed_values_testing)}")

    _estimator, train_spec, eval_spec, feature_labels, type_dict, target_type = \
        model(num_features, training_steps, lr, save_path, delayed_values_training,
              delayed_values_evaluation)

    _tuple = tf.estimator.train_and_evaluate(_estimator, train_spec=train_spec, eval_spec=eval_spec)

    print(_tuple)

    # compute confusion matrix
    mode = "tf_premade_multiclass"
    predictions = get_predictions_classification(_estimator,
                                                 lambda: lucd_uds.get_tf_dataset(feature_labels,
                                                                                 type_dict,
                                                                                 delayed_values_testing,
                                                                                 num_features,
                                                                                 target_type).batch(1),
                                                 mode)

    actual = (my_df_testing_label.compute()).iloc[:, 0].tolist()
    for index, item in enumerate(actual):
        if item == 'I. versicolor':
            actual[index] = 0
        elif item == 'I. virginica':
            actual[index] = 1
        else:
            actual[index] = 2

    label_mapping = _label_mapping()

    cm_out = tf.math.confusion_matrix(actual, predictions)
    cm_out = cm_out.numpy()
    print(cm_out)
    cm_list = []
    i = 0
    for r in cm_out:
        j = 0
        for c in r:
            i_string = label_mapping[i]
            j_string = label_mapping[j]
            cm_list.append(i_string + ':' + j_string + ':' + str(cm_out[i, j]))
            j += 1
        i += 1
    cm_string = ';'.join(cm_list)
    print(cm_string)

    # compute average precision, recall, and f1 score for multi-class (not multi-label)
    actual_np = np.array(actual)
    predictions_np = np.array(predictions)
    precision_macro, recall_macro, f1_macro, _ = precision_recall_fscore_support(actual_np, predictions_np,
                                                                                 average='macro')
    precision_micro, recall_micro, f1_micro, _ = precision_recall_fscore_support(actual_np, predictions_np,
                                                                                 average='micro')
    precision_weighted, recall_weighted, f1_weighted, _ = precision_recall_fscore_support(actual_np, predictions_np,
                                                                                          average='weighted')

    # compute precision, recall, and f1 score per label
    labels = list(set(actual))
    results = precision_recall_fscore_support(actual_np, predictions_np, average=None, labels=labels)
    results_string = ''
    for i in range(0, len(labels)):
        stat_list = []
        for stat in results[i]:
            stat_list.append(str(stat))
        stat_string = ','.join(stat_list)
        raw_label = labels[i]
        results_string += (label_mapping[raw_label] + '-')
        results_string += (stat_string + ';')

6. Test Locally

In [8]:
from lucd import LucdClient, log

# Use this for local EDA testing / development
username = '<your username>'
client = LucdClient(domain='<your domain>',
                    username=username,
                    password='<your password>')

client.set_max_data(limit=1000)

main({
    'train_id': '<your_local_name_for_reference>',
    'vds': '<your vds id from above>',
    'exportdir': '.',
    'parameters': {
        'lr': 0.01,
        'eval_percent': 0.10,
        'test_percent': 0.10,
        'steps': 100
    }
})

log.info("Model Training Complete.")

client.close()
2020-04-22 15:04:08,916 | root | INFO | dask.py:28 | Creating Dask LocalCluster: http://localhost:60000/status
c:\users\markstadtmueller\anaconda3\lib\site-packages\distributed\dashboard\core.py:79: UserWarning: 
Port 60000 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.
  warnings.warn("\n" + msg)
2020-04-22 15:04:25,940 | tensorflow | INFO | estimator.py:216 | Using config: {'_model_dir': '.', '_tf_random_seed': None, '_save_summary_steps': 10, '_save_checkpoints_steps': 10, '_save_checkpoints_secs': None, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}
2020-04-22 15:04:25,941 | tensorflow | INFO | estimator_training.py:186 | Not using Distribute Coordinator.
2020-04-22 15:04:25,942 | tensorflow | INFO | training.py:612 | Running training and evaluation locally (non-distributed).
2020-04-22 15:04:25,943 | tensorflow | INFO | training.py:700 | Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps 10 or save_checkpoints_secs None.
2020-04-22 15:04:25,960 | tensorflow | INFO | estimator.py:367 | Skipping training since max_steps has already saved.
2020-04-22 15:04:25,990 | tensorflow | WARNING | deprecation.py:506 | From c:\users\markstadtmueller\anaconda3\lib\site-packages\tensorflow_core\python\ops\resource_variable_ops.py:1635: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version.
Instructions for updating:
If using Keras pass *_constraint arguments to layers.
{'demo_9223370449524874138': {'description': 'Tutorial', 'model': {'data': ['flower.petal_length', 'flower.petal_width', 'flower.sepal_length', 'flower.sepal_width', 'flower_mean'], 'labels': ['flower.species']}, 'name': 'My_IRIS_VDS_1', 'operations': [{'command': 'custom', 'custom_operation_id': 'demo_9223370449703715244', 'dataset': '637229128990491299', 'orient': 'records', 'parameters': {'function_apply_method': 'map_partitions'}, 'return': '637229265635208812'}], 'query': {'aggs': {'agg_source': {'aggs': {'agg_model': {'aggs': {'topHits': {'top_hits': {'size': 10}}}, 'terms': {'field': 'model'}}}, 'terms': {'field': 'source'}}}, 'dataset': '637229128990491299', 'query': {'function_score': {'functions': [{'random_score': {}}], 'query': {'bool': {'filter': [], 'must': [{'bool': {'should': [{'match_phrase': {'source': 'iris'}}]}}, {'range': {'content_date': {'gte': None, 'lt': None}}}], 'must_not': []}}}}, 'size': 100}, 'query_size': 150, 'username': 'demo'}}
()
Train: 1, Test: 1
(None, None)
2020-04-22 15:04:26,079 | tensorflow | INFO | estimator.py:1151 | Calling model_fn.
2020-04-22 15:04:26,101 | tensorflow | WARNING | base_layer.py:1790 | Layer dnn is casting an input tensor from dtype float64 to the layer's dtype of float32, which is new behavior in TensorFlow 2.  The layer has dtype float32 because it's dtype defaults to floatx.

If you intended to run this layer in float32, you can safely ignore this warning. If in doubt, this warning is likely only an issue if you are porting a TensorFlow 1.X model to TensorFlow 2.

To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

2020-04-22 15:04:26,535 | tensorflow | WARNING | deprecation.py:323 | From c:\users\markstadtmueller\anaconda3\lib\site-packages\tensorflow_core\python\feature_column\feature_column_v2.py:4267: IndicatorColumn._variable_shape (from tensorflow.python.feature_column.feature_column_v2) is deprecated and will be removed in a future version.
Instructions for updating:
The old _FeatureColumn APIs are being deprecated. Please use the new FeatureColumn APIs instead.
2020-04-22 15:04:26,535 | tensorflow | WARNING | deprecation.py:323 | From c:\users\markstadtmueller\anaconda3\lib\site-packages\tensorflow_core\python\feature_column\feature_column_v2.py:4322: VocabularyListCategoricalColumn._num_buckets (from tensorflow.python.feature_column.feature_column_v2) is deprecated and will be removed in a future version.
Instructions for updating:
The old _FeatureColumn APIs are being deprecated. Please use the new FeatureColumn APIs instead.
2020-04-22 15:04:26,843 | tensorflow | INFO | estimator.py:1153 | Done calling model_fn.
2020-04-22 15:04:27,066 | tensorflow | INFO | monitored_session.py:246 | Graph was finalized.
2020-04-22 15:04:27,083 | tensorflow | INFO | saver.py:1284 | Restoring parameters from .\model.ckpt-10000
2020-04-22 15:04:27,160 | tensorflow | INFO | session_manager.py:504 | Running local_init_op.
2020-04-22 15:04:27,171 | tensorflow | INFO | session_manager.py:507 | Done running local_init_op.
c:\users\markstadtmueller\anaconda3\lib\site-packages\sklearn\metrics\_classification.py:1272: UndefinedMetricWarning: Precision and F-score are ill-defined and being set to 0.0 in labels with no predicted samples. Use `zero_division` parameter to control this behavior.
  _warn_prf(average, modifier, msg_start, len(result))
2020-04-22 15:04:28,142 | root | INFO | <ipython-input-8-a83b4267b074>:23 | Model Training Complete.
[[ 0 11  0]
 [ 0 15  0]
 [ 0  7  0]]
I. versicolor:I. versicolor:0;I. versicolor:I. virginica:11;I. versicolor:I. setosa:0;I. virginica:I. versicolor:0;I. virginica:I. virginica:15;I. virginica:I. setosa:0;I. setosa:I. versicolor:0;I. setosa:I. virginica:7;I. setosa:I. setosa:0

7. Save Sections 2 through 5 as a .py file

download the notebook as .py file and trim out, the other sections and save for uploading

Upload Model into Lucd Platform and Train Model

Go to the modeling tab in the platform and upload the model

Enter in model information and click the green check mark

Hit refresh and select your model by clicking on it

On the Training pop up, Select your VDS, and Enter your Training Parameters and hit the green check mark

You will see your model with a yellow dot next to it, indicating that training has started, the dot will go green when training is complete

On the upper left pulldown menu, select trained models. When your model training is complete, you will see your model and if you click on it, on the right hand side you will see the training results

On the lower right hand side you can select the download button and export your trained model i.e. "Download Artifacts"

This ends the tutorial. This tutorial walked through creating a VDS, performing custom operations, creating and running an AI model and exporting that model. Many more capabilities are in the platform and can be explored individually or via additional tutorials, articles, or user guides.

In [ ]: