Scaling ML using Cloud ML Engine

Me on Twitter: @Henrywebel

Contents

  1. Data Science Workflow
  2. Developing code
  3. Hands-ON Tutorial: Running MNIST on ML-Engine
  4. Setup Runtime for Notebook
  5. Load Data from BQ
  6. Package Model
  7. Train using ML-Engine
  8. Deployment
  9. Predictions
  10. Recap
  11. Appendix: Jupyter Slides

Shortcut: Run first cells and jump to any part in the notebook

Will only work after initial setup (see below) !

In [ ]:
# Fragment to initalize working with this notebook on the CLOUD
# check working directory
from utils import chdir_
pwd = chdir_()
## Import Tensorflow
try:
    import tensorflow as tf
except ModuleNotFoundError:
    raise ModuleNotFoundError("Install Tensorflow")
tf.__version__ 
In [ ]:
## import config:
import yaml
from pprint import pprint
with open("config.yaml", "r", encoding = "utf8") as f:
    config = yaml.safe_load(f)
pprint(config)
In [ ]:
## setup env-variables 
import os
import platform
PROJECT = config['project-id'] 
REGION = config['region'] # Choose an available region for Cloud MLE from https://cloud.google.com/ml-engine/docs/regions.
BUCKET = config['bucket'] # REPLACE WITH YOUR BUCKET NAME. Use a regional bucket in the region you selected.
PKG_NAME = config['pkg-name']
try: 
    TEST_DATA_JSON = config['testdatafile'] 
    os.environ['TEST_DATA_JSON'] = TEST_DATA_JSON
except: pass

os.environ['PROJECT'] = PROJECT
os.environ['BUCKET'] = BUCKET
os.environ['REGION'] = REGION 
os.environ['TFVERSION'] = str(config['tf-version'])  # Tensorflow version 1.4 before
os.environ['PKG_NAME'] = PKG_NAME
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config['credentials']
In [ ]:
# Set new OUTPUT and DATA directory on GS
OUTDIR = '/'.join(['gs:/', BUCKET, PKG_NAME, 'trained'])
DATA = '/'.join(['gs:/', BUCKET, PKG_NAME, 'data', 'mnist.npz'])
%env OUTDIR $OUTDIR
%env DATA $DATA
In [ ]:
import sys
local_python = sys.executable
%env PYTHON_LOCAL $local_python 
In [ ]:
%%cmd
gcloud config set project %PROJECT%
gcloud config set compute/region %REGION%
gcloud config set ml_engine/local_python "%PYTHON_LOCAL%"

Data Science Workflow (DSP)

  • Goal is to standardise the development of models
    • Checklist of necessary technical steps

Vision: Achieve an first end-to-end model in production within a productincrement of 10 weeks

Scale out: Scale without having to rewrite your model

Data Science Pipeline (DSP) - Checklist

Scaling Michelangelo - Data Science Process at Uber

Data Science Process at Uber

Step 1: Preparation Step 2: Data exploration and model building Step 3: Model deployment
1.1 Define business and project goal 2.1 Define and setup ML project infrastructure 3.1 Model industralization
1.2 Quick data exploration 2.2 Data exploration and visualizaiton 3.2 Gather and analyze insightbalancing ...)
1.3 ML models strategy 2.3 Build and evaluate a model -
- 2.4 Interpretability of ML model -
- 2.5 Productionize and deploy the ML models -

steps 1 and 2 can be done only locally

Developing code

Using your own laptop:

  • Cloud SDK on your laptop (CLI)
  • your IDE (e.g. PyCharme)
  • Juypter Notebook
  • your conda env
  • gcloud ml-engine local

Simple Cloud setup using

Your laptop

laptop-icon-24

  1. Call your python script (module) in your conda env
  2. Use gcloud ml-engine local train

AI Platform Notebooks: Deep Learning VM

Deep Learning VM

  • Preconfigured (Deep Learning) VMs for ML prottyping
    • only CPUs possible
  • you use a preconfigured runtime compatible to ML Engine runtimes for deployment

A cluster of machines using ML-Engine service

CloudMachineLearning.png

  • runs a script "autonomously" on the cloud and stops afterwards
  • offers to run different type of clusters
  • invoked by gcloud ml-engine train

Summary

develop on your laptop if you are comfortable with setting up your environements

otherwise develop on a preconfigured Notebook instance without too many compute attached to it

Migrate to ML-Engine Cluster on GCP to

  • distribute learning on several machines
  • serve model 24/7

Hands-ON Tutorial: Running MNIST on ML-Engine

  • deep dive into step 2 and 3 of proposed Data Science process
  • data exploration is omitted since a curated dataset is used
  • Some title reference to previously described Data Science Process, e.g. DSP 2.3

  • black and white images are numeric vectors (Feat 1- 784)
  • ten labels (Figures 0-9)
  • recognise hand-written digits (e.g. on a postal card)
  • standardise inputs to 0 - 1 range (e.g. using BEAM)

GCP services used in Tutorial

We will look today at following GCP Services-

DSP 2.1: Setup

  1. ML Engine Runtimes
  2. Repository Structure
  3. Configuration Variables
    • Environment variables to set
    • How to add them to your runtime
  4. Setup gcloud runtime

Create conda environment

 conda env create -f environment.yml -n env_gcp_dl
 conda activate env_gcp_dl
 jupyter notebook

Starts notebook-server with all packages in your current path

Change working directory

  • In order to import from src functionality later in this notebook, it is necessary to change to the root directory of the notebooks directory
In [ ]:
# check working directory
import os
WORKINGDIR = os.path.normpath(os.getcwd())
print("Current Working direcotory:\t{}".format(WORKINGDIR))
folders = WORKINGDIR.split(os.sep)
if folders.pop() in ['notebook', 'src', 'talks']:
  WORKINGDIR = os.sep.join(folders)
  print("Changed to New working directory:\t{dir}".format(dir=WORKINGDIR))
  os.chdir(WORKINGDIR) 

ML Engine Runtimes

Default ML-Engine Runtimes depend on the Tensorflow Version

In [ ]:
#!conda install tensorflow=1.13
In [ ]:
import tensorflow as tf
tf.__version__

Repository structure

In [ ]:
ls | grep "DIR\|yaml"

Key Directories containing information

.
+-- data
+-- src
|  +-- models
|  +-- packages
config.yaml

In the next step the contents of config.yaml will be important

GCP Environment Variables

  • PROJECT_ID: unique ID that identifies your project, e.g. ml-productive-pipeline-12345
  • BUCKET: BLOB-store ID. Each project has per default an bucket named by the PROJECT_ID
  • REGION: Which data center to use

All Cloud-ML-Engine Services are only available in europe-west1

  • all products per Region in europe: link
In [ ]:
# #Create config manually and save as yaml:
import yaml
config = {}
config['project-id'] = 'ml-productive-pipeline-12345'  # # REPLACE WITH YOUR PROJECT ID
config['region'] = 'europe-west1' # Choose an available region for Cloud MLE from https://cloud.google.com/ml-engine/docs/regions.
config['bucket'] = 'ml-productive-pipeline-12345'  # REPLACE WITH YOUR BUCKET NAME. Use a regional bucket in the region you selected.
config['pkg-name'] = 'pkg_mnist_fnn'
config['tf-version'] = '1.13'
config['env-name'] = 'gcp_dl'
config['credentials'] = 'cred/service_acccount_123.json'
with open("config.yaml", 'w', encoding= 'utf8') as f:
      yaml.dump(config, stream=f,  default_flow_style=False) 

ML-Engine Environment Variables

Additional Environment Variables needed for ML-Engine

  • PKG_NAME: Package Name which will contain your model
  • TF_VERSION: Tensorflow Version
In [ ]:
import yaml
from pprint import pprint
with open("config.yaml", "r", encoding = "utf8") as f:
    config = yaml.safe_load(f)
pprint(config)

Adding Environment Variables to your runtime

  • add variables persistently to the runtime of your kernel from jupyter (or datalab)
  • use os.environ dictionary
  • behind a proxy, configure globally
    • REQUESTS_CA_BUNDLE: optional, filepath to your SLL-certificate (works for request-package)
    • HTTPS_PROXY: optional, link to your proxy, possibly includign authentification or ports
  • possiblity to set environment variables for user permanently
In [ ]:
## setup env-variables 
import os
import platform
PROJECT = config['project-id'] 
REGION = config['region'] # Choose an available region for Cloud MLE from https://cloud.google.com/ml-engine/docs/regions.
BUCKET = config['bucket'] # REPLACE WITH YOUR BUCKET NAME. Use a regional bucket in the region you selected.
PKG_NAME = config['pkg-name']
#TEST_DATA_JSON = config['testdatafile'] # added later

os.environ['PROJECT'] = PROJECT
os.environ['BUCKET'] = BUCKET
os.environ['REGION'] = REGION
os.environ['TFVERSION'] = str(config['tf-version'])  # Tensorflow version 1.4 before
os.environ['PKG_NAME'] = PKG_NAME
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config['credentials']

Access Environment Variables

  • Now, you can access the environement variable in the terminal where your jupyter, datalab or ipython.
In [ ]:
!echo "Using Tensorflow Version: %TFVERSION%"

Setup gcloud runtime

In [ ]:
import sys
local_python = sys.executable
%env PYTHON_LOCAL $local_python 
In [ ]:
%%cmd
gcloud config set project %PROJECT%
gcloud config set compute/region %REGION%
gcloud config set ml_engine/local_python "%PYTHON_LOCAL%"

Access Control

  • sign in and let clients pick up credentials from GCloud SDK (this stores a json with your credentials on your machine)

      gcloud auth application-default login
  • Service Accounts (Creating and Managing Service Accounts)

    • need be assigned read/write permission to BUCKET

Load Data: Bigquery Client (DSP 2.2 )

There are several python clients available, see list. Here we use bigquery to load some data.

Picks up PROXY_HTTPS, REQUESTS_CA_BUNDLE, PROJECT_ID from environment

  • set all relevant variables as user environment variables
    1. search "env" in windows search bar (press windows button)
    2. select "Edit environment variables for your account"
    3. select "new" and add the PROXY_HTTPS, REQUESTS_CA_BUNDLE, PROJECT_ID

Example: Download from public dataset

In [ ]:
# pip install --upgrade google-cloud-bigquery
from google.cloud import bigquery
import os

PROJECT_ID = os.environ['PROJECT']
print("# Current project in use: {}\n".format(PROJECT_ID))
client = bigquery.Client(project=PROJECT_ID, )

sql = """
    SELECT *
    FROM `bigquery-public-data.usa_names.usa_1910_current`
    WHERE state = 'TX'
    LIMIT 10
"""
df = client.query(sql).to_dataframe()
print(df)

Download from project table

  • use test Dataset with table DATA of project (has to be created)
In [ ]:
sql = """
    SELECT *
    FROM `{project}.test.DATA`
    LIMIT 15
""".format(project=PROJECT)
df = client.query(sql).to_dataframe()
df.head()
In [ ]:
sql = """
    SELECT COUNT(label) as count
    FROM `{project}.test.DATA`
    GROUP BY label
""".format(project=PROJECT)
df = client.query(sql).to_dataframe()
df.transpose()

Downloading the entire table to pandas

  • BQ Query Default limit of128MB maximum reponse size, see quotas, does not allow to download entire Table
  • bigquery_storage client has to be used to download large datasets
In [ ]:
import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1

# Explicitly create a credentials object. This allows you to use the same
# credentials for both the BigQuery and BigQuery Storage clients, avoiding
# unnecessary API calls to fetch duplicate authentication tokens.
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
print("Credentials: {}".format(credentials))
print("PROJECT: {}".format(PROJECT))

# Make clients.
client = bigquery.Client(
    credentials=credentials,
    project=PROJECT
)
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
    credentials=credentials
) 

Download to pandas dataframe

  • can take very long
In [ ]:
# Download a table.
table = bigquery.TableReference.from_string(
    "{project}.test.DATA".format(project=PROJECT)
)
rows = client.list_rows(
    table,
    #selected_fields=[ 
    #    bigquery.SchemaField("label", "INTEGER")
    #],
)
df = rows.to_dataframe(bqstorage_client=bqstorageclient)
In [ ]:
df.head()
In [ ]:
import numpy as np
np.save(file='data/mnist/raw/mnist_all', allow_pickle=True, arr=df.to_numpy())

Model: Packaging model (DSP 2.3)

Take your code and put into a standard Python package structure, see Recommended package structure

Key-Idea:

  • define entry point which can be called
  • write all tasks as a function (callable)

Why a package?

  • can be called from other scripts import model

model.py

load most recent version, if needed:

Imports, Helper Functions

# First try to start Cloud ML uing MNIST example.
import tensorflow as tf
import numpy as np

from .utils import load_data
##########################################################################
#Factor into config:
IMAGE_SHAPE = (28,28)
N_PIXEL = 28 * 28
NUM_LABELS = 10

BATCH_SIZE = 128
EPOCHS = 5
##########################################################################
def parse_images(x):
    return x.reshape(len(x), -1).astype('float32')


def parse_labels(y):
    return y.astype('int32')

Input-Function used when Model is trained

def numpy_input_fn(images: np.ndarray,
                   labels: np.ndarray,
                   mode=tf.estimator.ModeKeys.EVAL,
                   epochs=EPOCHS,
                   batch_size=BATCH_SIZE):
    """
    Return depending on the `mode`-key an Interator which can be use to
    feed into the Estimator-Model. 

    Alternative if a `tf.data.Dataset` named `dataset` would be created:
    `dataset.make_one_shot_iterator().get_next()`
    """
    if mode == tf.estimator.ModeKeys.TRAIN:
        _epochs = epochs
        _shuffle = True
        _num_threads = 1 # This leads to doubling the number of epochs
    else:
        _epochs = 1
        _shuffle = False
        _num_threads = 1

    return tf.estimator.inputs.numpy_input_fn(
        {'x': images},
        y=labels,
        batch_size=batch_size,
        num_epochs=_epochs,                             
        shuffle=_shuffle, # Boolean, if True shuffles the queue. Avoid shuffle at prediction time.
        queue_capacity=1000, # Integer, number of threads used for reading 
        # and enqueueing. To have predicted order of reading and enqueueing,
        # such as in prediction and evaluation mode, num_threads should be 1.
        num_threads=_num_threads
    )

Input-Function used when Model is served

def serving_input_fn():
    feature_placeholders = {
        'x': tf.placeholder(tf.float32, shape=[None, N_PIXEL])
    }
    features = feature_placeholders
    return tf.estimator.export.ServingInputReceiver(
         features=features, 
         receiver_tensors=feature_placeholders,
         receiver_tensors_alternatives=None
         )

Entrypoint (main function)

def train_and_evaluate(args):
    """
    Utility function for distributed training on ML-Engine
    www.tensorflow.org/api_docs/python/tf/estimator/train_and_evaluate 
    """
    ##########################################
    # Load Data in Memoery
    # ToDo: replace numpy-arrays
    print('## load data, specified path to try: {}'.format(args['data_path']))
    (x_train, y_train), (x_test, y_test) = load_data(
        path=args['data_path'])

    x_train = parse_images(x_train)
    x_test = parse_images(x_test)

    y_train = parse_labels(y_train)
    y_test = parse_labels(y_test)

    model = tf.estimator.DNNClassifier(
        hidden_units= args['hidden_units'],  #[256, 128, 64],
        feature_columns=[tf.feature_column.numeric_column(
            'x', shape=[N_PIXEL, ])],
        model_dir=args['output_dir'],
        n_classes=NUM_LABELS,
        optimizer=tf.train.AdamOptimizer(learning_rate=args['learning_rate']),
        # activation_fn=,
        dropout=0.2,
        batch_norm=False,
        loss_reduction='weighted_sum',
        warm_start_from=None,
        config=tf.estimator.RunConfig(# save_summary_steps=200,
                                      save_checkpoints_steps=400,
                                      keep_checkpoint_max=5, 
                                      keep_checkpoint_every_n_hours=1,
                                      #log_step_count_steps=100,
                                      train_distribute=None,
                                      )
    )
    ## to cont.
## to cont.
    train_spec = tf.estimator.TrainSpec(
        input_fn=numpy_input_fn(
            x_train, y_train, mode=tf.estimator.ModeKeys.TRAIN, 
            batch_size = args['train_batch_size']),    
        max_steps=args['train_steps'],
        # hooks = None
    )
    # use `LatestExporter` for regular model exports:
    exporter = tf.estimator.LatestExporter('exporter', serving_input_fn)
    eval_spec = tf.estimator.EvalSpec(
        input_fn=numpy_input_fn(
            x_test, y_test, mode=tf.estimator.ModeKeys.EVAL),
        # steps=100,
        start_delay_secs=args['eval_delay_secs'],
        throttle_secs=args['min_eval_frequency'],
        exporters=exporter
    )
    print("## start training and evaluation\n"
          "### save model, ckpts, etc. to: {}".format(args['output_dir']))

    tf.estimator.train_and_evaluate(
        estimator=model, train_spec=train_spec, eval_spec=eval_spec)

task.py

write contents to file:

# Parse arguments and call main function
import os
import json
import argparse
import shutil
from pprint import pprint 

from .model import train_and_evaluate

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--data_path',
        help='GCS or local path to training data',
        required=True
    )
    parser.add_argument(
        '--output_dir',
        help='GCS location to write checkpoints and export models',
        required=True
    )
    parser.add_argument(
        '--train_batch_size',
        help='Batch size for training steps',
        type=int,
        default='128'
    )
    parser.add_argument(
        '--train_steps',
        help='Steps to run the training job for',
        type=int,
        default='200'
    )
    parser.add_argument(
        '--learning_rate',
        help='Learning Rate used for Adam',
        type=float,
        default='0.001'
    )
    parser.add_argument(
        '--hidden_units',
        help = 'Hidden layer sizes to use for DNN feature columns -- provide space-separated layers',
        type = str,
        default = "256 128 64"
    )

Add empty __init__.py to create package

In [ ]:
%%writefile src/pkg_mnist_fnn/__init__.py

Add function to load data

In [ ]:
%%writefile src/pkg_mnist_fnn/utils.py
import os
import numpy as np

from io import BytesIO
import tensorflow as tf
import numpy as np
from tensorflow.python.lib.io import file_io


def load_data(path='./data/'):
    """
    Load data in memory from local source, from data-repository
    or bucket (ToDo)

    Return
    -----
    x_train: numpy.array
        Shape: (60000, 28, 28)
    y_train: numpy.array
        Shape: (10000, )
    x_test: numpy.array
        s
    y_test: numpy.array
    """
    try:
        _path = os.path.normpath(path)
        with np.load(_path) as f:
            x_train, y_train = f['x_train'], f['y_train']
            x_test, y_test = f['x_test'], f['y_test']
            print("Loaded data from {}".format(_path))
        return (x_train, y_train), (x_test, y_test)
    except Exception:
        try:
            f = BytesIO(file_io.read_file_to_string(
                filename=path,
                binary_mode=True
            ))
            data = np.load(f)
            with data as f:
                x_train, y_train = f['x_train'], f['y_train']
                x_test, y_test = f['x_test'], f['y_test']
                print("Loaded data from {}".format(path))
            return (x_train, y_train), (x_test, y_test)
        except Exception:
            try:
                from tensorflow.keras.datasets import mnist
                (x_train, y_train), (x_test, y_test) = mnist.load_data()
                return (x_train, y_train), (x_test, y_test)
            except Exception:
                raise Exception("Not Connection to Server: Download manually to ./data/ from {}".format(
                    "https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz"
                ))

Train using ML-Engine on (DSP 2.3)

Modeling and ML-Engine

gcp_training_options-overview.png

  • Environment Variables with absolut paths to relevant folders:
    • PKG_NAME: Self-Contained Package to be exported into site-packages in venv
    • DATA, OUTDIR: Datafolder and where to store store checkpoints (logs, weights, graph)
    • PWD: where your project folder lies
    • JOBNAME: ID for ML-Engine
    • BUCKET: ID of Bucket
    • TIER: Type of Cluster

Adding Code snippets

gcp_training_options-overview.png

Schematic Overview

GCP for Data Scientists

Contents ML-Engine Section

  • Training
    • local (on your machine)
    • on cluster (submitting a job)
  • Hyperparameter search (on cluster)

Training on your local maschine with your python env

  • Set local folders
In [ ]:
data_local = os.path.join(os.getcwd(),'data', 'mnist', 'raw', 'mnist.npz')
OUTDIR_local = os.path.join(os.getcwd(),'trained', PKG_NAME)
os.environ['OUTDIR_LOCAL'] = OUTDIR_local
os.environ['DATA_LOCAL'] = data_local

print("Local Data Directory:\t {}".format(os.environ['DATA_LOCAL']))
print("Local Output Dir:\t {}".format(os.environ['OUTDIR_LOCAL']))
In [ ]:
import shutil
shutil.rmtree(OUTDIR_local, ignore_errors=True)
os.makedirs(name= OUTDIR_local, exist_ok=True)
os.listdir(OUTDIR_local)

Running the Python module without gcp ml-engine

  • Entry point is defined in task.py
    • parses command line arguments
  • conda env has to be active
In [ ]:
%%cmd
python -m src.%PKG_NAME%.task ^
   --data_path=%DATA_LOCAL% ^
   --output_dir=%OUTDIR_LOCAL% ^
   --train_steps=1500 ^
   --job_dir=tmp
echo "Saved Model, ckpts, exported model to: %OUTDIR%"
dir %OUTDIR_LOCAL% 

Call hidden units parameter

  • change model architecture
  • here previous model is deleted -> later several model will be compared
In [ ]:
import shutil
shutil.rmtree(OUTDIR_local, ignore_errors=True)
os.makedirs(name= OUTDIR_local, exist_ok=True)
In [ ]:
%%cmd
python -m src.%PKG_NAME%.task ^
   --data_path=%DATA_LOCAL% ^
   --output_dir=%OUTDIR_LOCAL% ^
   --train_steps=1500 ^
   --train_batch_size 128  ^
   --learning_rate 0.01 ^
   --hidden_units "256 128 64" ^
   --job_dir=tmp
echo "Saved Model, ckpts, exported model to: %OUTDIR%"
dir %OUTDIR_LOCAL%

Saved Model

In [ ]:
os.listdir(os.path.normpath("{}/export/exporter".format(OUTDIR_local)))[-1]

And we would be ready to deploy

... but of course not without looking at performance metrics or predictions!

Training using gcloud ml-engine local train

  • continue training using ml-engine local
  • needs full-paths for out-dir: Add $PWD
In [ ]:
import shutil
shutil.rmtree(OUTDIR_local, ignore_errors=True)
os.makedirs(name= OUTDIR_local, exist_ok=True)
In [ ]:
%%cmd
gcloud ml-engine local train ^
   --module-name=%PKG_NAME%.task ^
   --package-path=src\%PKG_NAME% ^
   -- ^
   --data_path=%DATA_LOCAL% ^
   --output_dir=%OUTDIR_LOCAL% ^
   --train_steps=1700 ^
   --job_dir=.\tmp 
dir %OUTDIR_LOCAL% 
In [ ]:
!gcloud ml-engine local train  --help

Training Cloud using gcloud ml-engine train

  • a copy of the data is in Google Storage (buckets)
  • gcloud ml-engine output is saved to OUTDIRin Google Storage
    • checkpoints (logs)
    • model graph and weights
  • data is copied to Google Storage (see console)
In [ ]:
#Set JOBNAME
import datetime
JOBNAME = 'mnist_' + datetime.datetime.now().strftime("%y%m%d_%H%M%S")
%env JOBNAME {JOBNAME}
# Set new OUTPUT and DATA directory in GS
OUTDIR = '/'.join(['gs:/', BUCKET, JOBNAME])
DATA = '/'.join(['gs:/', BUCKET, PKG_NAME, 'data', 'mnist.npz'])
%env OUTDIR $OUTDIR
%env DATA $DATA

Data Transfer

In [ ]:
%%cmd 
gsutil -m cp %cd%\data\mnist\raw\mnist.npz %DATA%
In [ ]:
!gsutil ls "%DATA%" 

ml-engine on cluster

  • set JOBNAME and decide which tier to use
In [ ]:
%env TIER BASIC
In [ ]:
%%cmd
echo "OUT: %OUTDIR%, Region: %REGION%, JOBNAME: %JOBNAME%"
gsutil -m rm -rf %OUTDIR%

gcloud ml-engine jobs submit training %JOBNAME% ^
   --region=%REGION% ^
   --module-name=%PKG_NAME%.task ^
   --package-path=src\%PKG_NAME% ^
   --staging-bucket=gs://%BUCKET% ^
   --scale-tier=%TIER% ^
   --python-version 3.5 ^
   --runtime-version=%TFVERSION% ^
   -- ^
   --data_path=%DATA% ^
   --output_dir=%OUTDIR% ^
   --train_steps=5000 ^
   --job_dir=%OUTDIR%/jobs 

Fetch logs from ml-engine job

In [ ]:
!gcloud ml-engine jobs describe %JOBNAME%
In [ ]:
!gcloud ml-engine jobs stream-logs %JOBNAME%
  • Bayesian approach to find optimal hyperparameters, see

    Golovin et.al (2017): Google Vizier: A Service for Black-Box Optimization

  • consecutive search, here
    • 2 trials in parallel
    • a total of 30 trials
  • see hyperp_config.yaml:

    • train_batch_size
    • hidden_units
  • Pick an algorithm to search Hyperparameter space

    1. ALGORITHM_UNSPECIFIED: Bayesian Search
    2. GRID_SEARCH
    3. RANDOM_SEARCH

Configure Search in hyperp_config.yaml:

In [ ]:
%%writefile hyperp_config.yaml
trainingInput:
  hyperparameters:
    goal: MAXIMIZE
    hyperparameterMetricTag: accuracy
    maxTrials: 30
    maxParallelTrials: 4
    algorithm: ALGORITHM_UNSPECIFIED
    params:
      - parameterName: train_batch_size
        type: INTEGER
        minValue: 64
        maxValue: 512
        scaleType: UNIT_LINEAR_SCALE
      - parameterName: hidden_units
        type: CATEGORICAL
        categoricalValues: ["256 128 64", "128 64 32", "512 256 128 64", "256 128 64 32"]
      - parameterName: learning_rate
        type: DOUBLE
        minValue: 0.0001
        maxValue: 0.1
        scaleType: UNIT_LOG_SCALE
In [ ]:
# %load hyperp_config.yaml
trainingInput:
  hyperparameters:
    goal: MAXIMIZE
    hyperparameterMetricTag: accuracy
    maxTrials: 30
    maxParallelTrials: 4
    algorithm: ALGORITHM_UNSPECIFIED
    params:
      - parameterName: train_batch_size
        type: INTEGER
        minValue: 64
        maxValue: 512
        scaleType: UNIT_LINEAR_SCALE
      - parameterName: hidden_units
        type: CATEGORICAL
        categoricalValues: ["256 128 64", "128 64 32", "512 256 128 64", "256 128 64 32"]
      - parameterName: learning_rate
        type: DOUBLE
        minValue: 0.0001
        maxValue: 0.1
        scaleType: UNIT_LOG_SCALE

Create unique jobname: JOBNAME_HYPER

  • decide which TIER to use
In [ ]:
# Set JOBNAME environment variable
import datetime
JOBNAME_HYPER = "mnist_{}_hyper".format(datetime.datetime.now().strftime("%y%m%d_%H%M%S"))
%env JOBNAME_HYPER {JOBNAME_HYPER}
# Set new OUTPUT and DATA directory in GS
OUTDIR_HYPER = '/'.join(['gs:/', BUCKET, JOBNAME_HYPER])
DATA = '/'.join(['gs:/', BUCKET, PKG_NAME, 'data', 'mnist.npz'])
%env OUTDIR_HYPER $OUTDIR_HYPER
%env DATA $DATA
%env TIER STANDARD_1

Start Bayesian Hyperparameter Search:

  • add config parameter with hyperp_config.yaml as argument:
  • one can add other parameter to hyperp_config.yaml, see docs on submitting
In [ ]:
%%cmd
echo %OUTDIR_HYPER %DATA% %REGION% %JOBNAME_HYPER%
gcloud ml-engine jobs submit training %JOBNAME_HYPER% ^
   --region %REGION% ^
   --module-name %PKG_NAME%.task ^
   --package-path %cd%/src/%PKG_NAME% ^
   --staging-bucket gs://%BUCKET% ^
   --scale-tier %TIER% ^
   --python-version 3.5 ^
   --runtime-version %TFVERSION% ^
   --config hyperp_config.yaml ^
   -- ^
   --data_path %DATA% ^
   --output_dir %OUTDIR_HYPER% ^
   --train_steps 5000 ^
   --job_dir %OUTDIR_HYPER%/jobs
In [ ]:
!gcloud ml-engine jobs stream-logs %JOBNAME_HYPER%

Get results from job using API

See client documentation on ml-engine and ml.projects().jobs().get() method.

Using requests-package behind a proxy

In [ ]:
import subprocess
import requests
import json
import os
In [ ]:
url = 'https://ml.googleapis.com/v1/projects/{project}/jobs'.format(project=PROJECT)
headers = {
   'Content-Type': 'application/json',
   'Authorization':  'Bearer {}'.format(subprocess.run('gcloud auth print-access-token', shell=True, check=True, 
                                                       stdout=subprocess.PIPE).stdout.decode().replace(os.linesep, ''))
}
json_response = requests.get(url=url, headers=headers)
json.loads(json_response.text)
In [ ]:
os.environ['JOBNAME_HYPER'] = 'mnist_191128_142331_hyper' #'mnist_{}_hyper'.format('191128_142331') #mnist_191128_142331_hyper
jobname = os.environ['JOBNAME_HYPER']
print(jobname)
url = 'https://ml.googleapis.com/v1/projects/{project}/jobs/{jobname}'.format(project=PROJECT, jobname=jobname)
headers = {
   'Content-Type': 'application/json',
   'Authorization':  'Bearer {}'.format(subprocess.run('gcloud auth print-access-token', shell=True, check=True, 
                                                       stdout=subprocess.PIPE).stdout.decode().replace(os.linesep, ''))
}
print(headers)
json_response = requests.get(url=url, headers=headers)
json.loads(json_response.text)

Using googleapiclient.discorvery

In [ ]:
from googleapiclient import discovery
ml = discovery.build('ml', 'v1')
In [ ]:
ml.projects().jobs().list(parent='projects/{}'.format(PROJECT)).execute() 
In [ ]:
from googleapiclient import discovery
from googleapiclient import errors
from pprint import pprint

def get_job_results(jobname, project=PROJECT):
    """
    Builds a discovery client with GCMLE endpoint.
    
    jobname: str
        Jobname used to run GCMLE task
    project: str
        Project-Name
    
    Return
    responste: dict
        Dictionary containing information to create job and its results
    """
    ml = discovery.build('ml', 'v1')
    endpoint = 'projects/{project}/jobs/{jobname}'.format(project=project, jobname=jobname)
    print("API endpoint: {}".format(endpoint))
    request = ml.projects().jobs().get(name=endpoint)
    # Make the call.
    try:
        response_dict = request.execute()
        pprint(response_dict)
    except errors.HttpError as err:
        print('There was an error creating the model. Check the details:')
        print(err._get_reason())
        raise
    return response_dict
get_job_results(os.environ['JOBNAME_HYPER'])

Excursus: Check Results in TensorBoard

  • metrics and variables are inspected from the logs, called checkpoints (ckpt)
  • Dashboard on localhost: TensorBoard

Inspect Model trained on your machine by starting a local tensorboard server:

  • tensorboard --logdir trained/pkg_mnist_fnn/

Screenshot of Tensorboard

screenshot Tensorboard

Excursus: Load data from the bucket

  • Binary Object has to be read by BytesIO module
In [ ]:
from google.cloud import storage
from io import BytesIO
import numpy as np

storage_client = storage.Client(project=PROJECT) # use current gcloud PROJECT_ID
bucket = storage_client.get_bucket(BUCKET)
blob = bucket.blob("pkg_mnist_fnn/data/mnist.npz")

data = blob.download_as_string()
data = BytesIO(data)
data = np.load(data)
with data as f: 
    x_train, y_train = f['x_train'], f['y_train']
    x_test, y_test = f['x_test'], f['y_test']
In [ ]:
y_test.shape

Deploy model - from any previous step (DSP 2.5)

  • tf.estimator.LatestExporteris used to store a model for deployment in the cloud
  • See also: tf.estimator.export, tf.saved_model

Link to Console

Check that a model has been saved on your Bucket:

get best model found in from Hyperparameter Tuning

  • get_job_results is defined before
In [ ]:
#%env JOBNAME_HYPER mnist_191128_142331_hyper  # uncomment and set in case you take an old job
job_details = get_job_results(os.environ["JOBNAME_HYPER"])
In [ ]:
best_run = job_details['trainingOutput']['trials'][0]
print("Run with best performance on chosen metrics:\n{}".format(best_run))
In [ ]:
%env TRIAL_ID {best_run['trialId']}
models = !gsutil ls gs://%PROJECT%/%JOBNAME_HYPER%/%TRIAL_ID%/export/exporter/
models

Use best model from Hyper-Parameter Tuning Job (Query is shown before)

In [ ]:
%env MODEL_LOCATION={models[-1]} 

Deploy

Identifier for deployed model:

  • MODEL_NAME
  • MODEL_VERSION
In [ ]:
%env MODEL_NAME "MNIST_MLENGINE"
%env MODEL_VERSION "v2"  

Create: A model (Dataset) has different versions (Tables)

In [ ]:
%%cmd
gcloud ml-engine models   create %MODEL_NAME% --regions %REGION%
gcloud ml-engine versions create %MODEL_VERSION% --model %MODEL_NAME% ^
     --origin %MODEL_LOCATION% ^
     --runtime-version %TFVERSION% ^
     --python-version 3.5

Predictions

  1. Using the Model saved by Python Module
  2. Using Model saved by ml-engine local
  3. Using Model trained online

Tools get predictions:

  • Command Line Interfaces
    • gcloud ml-engine local predict
    • gcloud ml-engine predict
  • Python Client

Create an test-image in numpy format

  1. Add filename to config-file
  2. Create file containing N examples
In [ ]:
N=4
testdatafile = "data/mnist/json/ml_engine_testdatafile_N{}.json".format(N)
with open("config.yaml", "r", encoding = "utf8") as f:
    config = yaml.load(f)
with open("config.yaml", "w", encoding = "utf8") as f:
    config['testdatafile'] = testdatafile
    yaml.dump(config, stream=f,  default_flow_style=False)
TEST_DATA_JSON = testdatafile
%env TEST_DATA_JSON $testdatafile
In [ ]:
# Create a file with 4 test images
import numpy as np
import json
from src.pkg_mnist_fnn.utils import load_data
from src.pkg_mnist_fnn.model import parse_images
(_,_), (x_test, y_test) = load_data(path='data/mnist/raw/mnist.npz')
test_indices = np.random.randint(low=0, high=len(y_test), size=N)
x_test, y_test = x_test[test_indices], y_test[test_indices]
x_test = parse_images(x_test).tolist()

#eol = os.linesep
#print(eol)
n_lines = len(y_test)
with open(testdatafile, "w") as f:
    for image, label in zip(x_test, y_test):
        _dict = {"x": image} #, "y": int(label)}
        f.write(json.dumps(_dict)+ "\n")
print("Wrote to {}".format(testdatafile))

Let's look at our four examples

In [ ]:
from src.utils.mnist_utils import plot_mnist_testdata
plot_mnist_testdata(TEST_DATA_JSON)

ML-Engine: ml-engine local predict

  • Using Model saved
    • Python module
    • ml-engine local
In [ ]:
model_dir = os.listdir("{}/export/exporter".format(OUTDIR_local))[-1]
%env model_dir=$model_dir
In [ ]:
%%cmd
set MODEL_LOCATION=%OUTDIR_LOCAL%\export\exporter\%model_dir%\
echo "Selected Model:  %MODEL_LOCATION%" 
gcloud ml-engine local predict ^
    --model-dir=%MODEL_LOCATION% ^
    --json-instances=%TEST_DATA_JSON% ^
    --verbosity debug > data/test_predictions
In [ ]:
%%cmd
notepad data/test_predictions

Online Prediction - Command Line

  • same output format as before, check Console: link
In [ ]:
%%cmd  
gcloud ml-engine predict --model=MNIST_MLENGINE --version=v1 --json-instances=%TEST_DATA_JSON%

Online Predictions - Batch

  • cp example
  • data_format= 'text' for JSON-Format
  • output-path: GS folder where results will be saved
  • input-paths: File-Location (can be folder with several files)
In [ ]:
import datetime
JOBNAME_BATCH_PRED = 'BATCH_' + datetime.datetime.now().strftime("%y%m%d_%H%M%S")
%env JOBNAME_BATCH_PRED {JOBNAME_BATCH_PRED}
%env DATA_FORMAT text
%env OUTPUT_PATH {'/'.join([os.path.split(OUTDIR)[0], "batch_pred/"])}
%env TEST_DATA_GS {'/'.join([os.path.split(DATA)[0], os.path.split(TEST_DATA_JSON)[1]])}_

Copy files

In [ ]:
!gsutil cp data/mnist/json/ml_engine_testdatafile_N4.json %TEST_DATA_GS%

Submit job using gcloud functionality

In [ ]:
%%cmd
gcloud ml-engine jobs submit prediction %JOBNAME_BATCH_PRED%  --model=MNIST_MLENGINE --version=v1 --input-paths=%TEST_DATA_GS% --output-path %OUTPUT_PATH%  --region %REGION% --data-format %DATA_FORMAT%

Retrieve results from batch and parse them

In [ ]:
files = !gsutil ls %OUTPUT_PATH%
print(files)  
In [ ]:
from google.cloud import storage
import json
mybucket= storage.Client(project=PROJECT).get_bucket('{}'.format(BUCKET))
file = files[1].split("{}".format(BUCKET + "/"))[1]
print("Get file {}".format(file))
blob= mybucket.blob(file)
result = blob.download_as_string()

result = [json.loads(x) for x in (result.decode().split("\n"))[:-1]]
print(result[0])

Online Predictions

In [ ]:
MODEL_NAME = 'MNIST_MLENGINE' 
VERSION = 'v1'
print(PROJECT)

Load data into python:

In [ ]:
import json
instances = []
with open(TEST_DATA_JSON, "r") as f:
    data = f.readlines()
instances = [json.loads(x) for x in data]   # for discovery-client
data = [image['x'] for  image in instances] # for requests-package

Using requests-package behind a proxy

In [ ]:
import subprocess
import requests
import os
url = 'https://ml.googleapis.com/v1/projects/{project}/models/{model}/versions/{version}:predict'.format(project=PROJECT,
                                                                                                         model=MODEL_NAME,
                                                                                                         version=VERSION)
headers = {
   'Content-Type': 'application/json',
   'Authorization':  'Bearer {}'.format(subprocess.run('gcloud auth print-access-token', shell=True, check=True, 
                                                       stdout=subprocess.PIPE).stdout.decode().replace(os.linesep, ''))
}
request_data = {"instances":
    data
}
print(headers)
json_response = requests.post(url=url, data=json.dumps(request_data), headers=headers)
pprint(json.loads(json_response.text))

Using googleapiclient.discovery

  • fails behind proxy due to SSL verification (which could not be deactivated) #### Authentification
In [ ]:
from googleapiclient import discovery
api = discovery.build(serviceName='ml', version='v1',
                      #http= httplib2.Http(disable_ssl_certificate_validation=True),
                      discoveryServiceUrl='https://www.googleapis.com/discovery/v1/apis/{api}/{apiVersion}/rest',
                      #credentials=cred,  # SDK credentials
                     )
cmd
UserWarning: Your application has authenticated using end user credentials from Google Cloud SDK. We recommend that most server applications use service accounts instead. **If your application continues to use end user credentials from Cloud SDK, you might receive a "quota exceeded" or "API not enabled" error**. For more information about service accounts, see https://cloud.google.com/docs/authentication/
warnings.warn(_CLOUD_SDK_CREDENTIALS_WARNING)

Get predictions for samples

In [ ]:
project_id = 'projects/{project}/models/{model}/versions/{version}'.format(project=PROJECT, model=MODEL_NAME, version=VERSION)
print("Endpoint to use: {}\n".format(project_id))
request_data = {"instances":
    instances
}
request = api.projects().predict(body=request_data, name=project_id).execute()
pprint(request)
In [ ]:
for i, pred in enumerate(request['predictions']):
    print("Predicted class: {}, True Class:\t{}".format(
        pred['classes'][0], 
        y_test[i]))

Recap

gcp_training_options-gcp_services.png

Outlook

  • Add different models types
    • different layers of abstraction in tensorflow
    • sklearn
  • Show how to use ml-engine in SQL in BigQuery

Appendix

Notes on Jupyter Slides

  • Activate: View -> Cell Toolbar -> Slideshow
  • Install nbextensions into base conda environment
  • RISE for interactive presentations
    • using conda: conda install -c conda-forge rise
    • activte scrolling in Notebook-Metadata, see link
    • adapt width and height of your slides to your machine and needs. link
In [ ]: