Skip to content

Customized preprocessing and postprocessing (in vLLM)

In this tutorial, you will use LMI container from DLC to SageMaker and run inference with it.

Please make sure the following permission granted before running the notebook:

  • S3 bucket push access
  • SageMaker access

Step 1: Let's bump up SageMaker and import stuff

%pip install sagemaker --upgrade  --quiet
import boto3
import sagemaker
from sagemaker import Model, image_uris, serializers, deserializers

role = sagemaker.get_execution_role()  # execution role for the endpoint
sess = sagemaker.session.Session()  # sagemaker session for interacting with different AWS APIs
region = sess._region_name  # region name of the current SageMaker Studio environment
account_id = sess.account_id()  # account_id of the current SageMaker Studio environment

Step 2: Start preparing model artifacts

In LMI contianer, we expect some artifacts to help setting up the model - serving.properties (required): Defines the model server settings - model.py (optional): A python file to define the core inference logic - requirements.txt (optional): Any additional pip wheel need to install

%%writefile serving.properties
engine=Python
option.model_id=TheBloke/Llama-2-7B-fp16
option.tensor_parallel_degree=4
option.max_rolling_batch_size=16
option.rolling_batch=vllm
option.trust_remote_code=true

In this step, we will try to override the default HuggingFace handler provided by DJLServing. We will replace the output formatter with custom_output_formatter, which outputs the token id, text, and log probability instead of just the text.

We will also replace the input formatter with custom_input_formatter to accept "prompt" instead of "inputs" in the request (e.g. {"prompt": "...", "parameters": {}} is now a valid request instead of {"inputs": "...", "parameters": {}}

You can replace either of these functions with your own custom input formatter and output formatter. The only restrictions are as follows:

Input Formatter - Returns a 5-tuple of the following: - A list of strings (prompt) - An int (size of input) - A dictionary (containing settings like top_k, temperature, etc.) - A dictionary (for error logging) - A list of Input objects (just use inputs.get_batches())

Output Formatter - 5 required parameters (these will be sent into the output formatter by the service): - a Token object (defined here) - a boolean denoting if this is the first token - a boolean denoting if this is the last token - a dictionary with miscellaneous information (e.g. finish reason) - a string containing previously generated tokens - Returns a string

%%writefile model.py
from djl_python.huggingface import HuggingFaceService
from djl_python.inputs import Input
from djl_python.outputs import Output
from djl_python.encode_decode import encode, decode
import logging
import json
import types

_service = HuggingFaceService()

def custom_output_formatter(token, first_token, last_token, details, generated_tokens):
    """
    Replace this function with your custom output formatter.

    Args:
        token (Token): Token object 
        first (bool): If first token 
        last (bool): If last token
        aux (dict): Miscellaneous information
        prev_response (str): Previously generated tokens

    Returns:
        (str): Response string

    """
    result = {"token_id": token.id, "token_text": token.text, "token_log_prob": token.log_prob}
    if last_token:
        result["finish_reason"] = details["finish_reason"]
    return json.dumps(result) + "\n"

def custom_input_formatter(self, inputs):
    """
    Replace this function with your custom input formatter.

    Args:
        data (obj): The request data, dict or string  

    Returns:
        (tuple): input_data (list), input_size (list), parameters (dict), errors (dict), batch (list)
    """
    input_data = []
    input_size = []
    parameters = []
    errors = {}
    batch = inputs.get_batches()
    for i, item in enumerate(batch):
        try:
            content_type = item.get_property("Content-Type")
            input_map = decode(item, content_type)
        except Exception as e:  # pylint: disable=broad-except
            logging.warning(f"Parse input failed: {i}")
            input_size.append(0)
            errors[i] = str(e)
            continue

        _inputs = input_map.pop("prompt", input_map)
        if not isinstance(_inputs, list):
            _inputs = [_inputs]
        input_data.extend(_inputs)
        input_size.append(len(_inputs))

        _param = input_map.pop("parameters", {})
        if "cached_prompt" in input_map:
            _param["cached_prompt"] = input_map.pop("cached_prompt")
        if not "seed" in _param:
            # set server provided seed if seed is not part of request
            if item.contains_key("seed"):
                _param["seed"] = item.get_as_string(key="seed")
        for _ in range(input_size[i]):
            parameters.append(_param)

    return input_data, input_size, parameters, errors, batch

def handle(inputs: Input):
    """
    Default handler function
    """
    if not _service.initialized:
        # stateful model
        props = inputs.get_properties()
        props['output_formatter'] = custom_output_formatter
        _service.initialize(props)
        _service.parse_input = types.MethodType(custom_input_formatter, _service)

    if inputs.is_empty():
        # initialization request
        return None

    return _service.inference(inputs)
%%sh
mkdir mymodel
mv serving.properties mymodel/
mv model.py mymodel/
tar czvf mymodel.tar.gz mymodel/
rm -rf mymodel

Step 3: Start building SageMaker endpoint

In this step, we will build SageMaker endpoint from scratch

Getting the container image URI

Large Model Inference available DLC

image_uri = image_uris.retrieve(
        framework="djl-deepspeed",
        region=sess.boto_session.region_name,
        version="0.27.0"
    )

Upload artifact on S3 and create SageMaker model

s3_code_prefix = "large-model-lmi/code"
bucket = sess.default_bucket()  # bucket to house artifacts
code_artifact = sess.upload_data("mymodel.tar.gz", bucket, s3_code_prefix)
print(f"S3 Code or Model tar ball uploaded to --- > {code_artifact}")

model = Model(image_uri=image_uri, model_data=code_artifact, role=role)

4.2 Create SageMaker endpoint

You need to specify the instance to use and endpoint names

instance_type = "ml.g5.12xlarge"
endpoint_name = sagemaker.utils.name_from_base("lmi-model")

model.deploy(initial_instance_count=1,
             instance_type=instance_type,
             endpoint_name=endpoint_name,
             # container_startup_health_check_timeout=3600
            )

# our requests and responses will be in json format so we specify the serializer and the deserializer
predictor = sagemaker.Predictor(
    endpoint_name=endpoint_name,
    sagemaker_session=sess,
    serializer=serializers.JSONSerializer(),
)

Step 5: Test and benchmark the inference

Since we've changed the input preprocessing, the following will no longer work since the "inputs" field is no longer recognized:

predictor.predict(
    {"inputs": "Large model inference is", "parameters": {}}
)

But this will work:

predictor.predict(
    {"prompt": "Large model inference is", "parameters": {}}
)

Notice that the output format looks different compared to the output format in an example without customized postprocessing because we changed the output formatter.

Clean up the environment

sess.delete_endpoint(endpoint_name)
sess.delete_endpoint_config(endpoint_name)
model.delete_model()