Customized preprocessing and postprocessing (in vLLM)¶
In this tutorial, you will use LMI container from DLC to SageMaker and run inference with it.
Please make sure the following permission granted before running the notebook:
- S3 bucket push access
- SageMaker access
Step 1: Let's bump up SageMaker and import stuff¶
%pip install sagemaker --upgrade --quiet
import boto3
import sagemaker
from sagemaker import Model, image_uris, serializers, deserializers
role = sagemaker.get_execution_role() # execution role for the endpoint
sess = sagemaker.session.Session() # sagemaker session for interacting with different AWS APIs
region = sess._region_name # region name of the current SageMaker Studio environment
account_id = sess.account_id() # account_id of the current SageMaker Studio environment
Step 2: Start preparing model artifacts¶
In LMI contianer, we expect some artifacts to help setting up the model - serving.properties (required): Defines the model server settings - model.py (optional): A python file to define the core inference logic - requirements.txt (optional): Any additional pip wheel need to install
%%writefile serving.properties
engine=Python
option.model_id=TheBloke/Llama-2-7B-fp16
option.tensor_parallel_degree=4
option.max_rolling_batch_size=16
option.rolling_batch=vllm
option.trust_remote_code=true
In this step, we will try to override the default HuggingFace handler provided by DJLServing. We will replace the output formatter with custom_output_formatter
, which outputs the token id, text, and log probability instead of just the text.
We will also replace the input formatter with custom_input_formatter
to accept "prompt" instead of "inputs" in the request (e.g. {"prompt": "...", "parameters": {}}
is now a valid request instead of {"inputs": "...", "parameters": {}}
You can replace either of these functions with your own custom input formatter and output formatter. The only restrictions are as follows:
Input Formatter
- Returns a 5-tuple of the following:
- A list of strings (prompt)
- An int (size of input)
- A dictionary (containing settings like top_k, temperature, etc.)
- A dictionary (for error logging)
- A list of Input objects (just use inputs.get_batches()
)
Output Formatter - 5 required parameters (these will be sent into the output formatter by the service): - a Token object (defined here) - a boolean denoting if this is the first token - a boolean denoting if this is the last token - a dictionary with miscellaneous information (e.g. finish reason) - a string containing previously generated tokens - Returns a string
%%writefile model.py
from djl_python.huggingface import HuggingFaceService
from djl_python.inputs import Input
from djl_python.outputs import Output
from djl_python.encode_decode import encode, decode
import logging
import json
import types
_service = HuggingFaceService()
def custom_output_formatter(token, first_token, last_token, details, generated_tokens):
"""
Replace this function with your custom output formatter.
Args:
token (Token): Token object
first (bool): If first token
last (bool): If last token
aux (dict): Miscellaneous information
prev_response (str): Previously generated tokens
Returns:
(str): Response string
"""
result = {"token_id": token.id, "token_text": token.text, "token_log_prob": token.log_prob}
if last_token:
result["finish_reason"] = details["finish_reason"]
return json.dumps(result) + "\n"
def custom_input_formatter(self, inputs):
"""
Replace this function with your custom input formatter.
Args:
data (obj): The request data, dict or string
Returns:
(tuple): input_data (list), input_size (list), parameters (dict), errors (dict), batch (list)
"""
input_data = []
input_size = []
parameters = []
errors = {}
batch = inputs.get_batches()
for i, item in enumerate(batch):
try:
content_type = item.get_property("Content-Type")
input_map = decode(item, content_type)
except Exception as e: # pylint: disable=broad-except
logging.warning(f"Parse input failed: {i}")
input_size.append(0)
errors[i] = str(e)
continue
_inputs = input_map.pop("prompt", input_map)
if not isinstance(_inputs, list):
_inputs = [_inputs]
input_data.extend(_inputs)
input_size.append(len(_inputs))
_param = input_map.pop("parameters", {})
if "cached_prompt" in input_map:
_param["cached_prompt"] = input_map.pop("cached_prompt")
if not "seed" in _param:
# set server provided seed if seed is not part of request
if item.contains_key("seed"):
_param["seed"] = item.get_as_string(key="seed")
for _ in range(input_size[i]):
parameters.append(_param)
return input_data, input_size, parameters, errors, batch
def handle(inputs: Input):
"""
Default handler function
"""
if not _service.initialized:
# stateful model
props = inputs.get_properties()
props['output_formatter'] = custom_output_formatter
_service.initialize(props)
_service.parse_input = types.MethodType(custom_input_formatter, _service)
if inputs.is_empty():
# initialization request
return None
return _service.inference(inputs)
%%sh
mkdir mymodel
mv serving.properties mymodel/
mv model.py mymodel/
tar czvf mymodel.tar.gz mymodel/
rm -rf mymodel
Step 3: Start building SageMaker endpoint¶
In this step, we will build SageMaker endpoint from scratch
Getting the container image URI¶
image_uri = image_uris.retrieve(
framework="djl-deepspeed",
region=sess.boto_session.region_name,
version="0.27.0"
)
Upload artifact on S3 and create SageMaker model¶
s3_code_prefix = "large-model-lmi/code"
bucket = sess.default_bucket() # bucket to house artifacts
code_artifact = sess.upload_data("mymodel.tar.gz", bucket, s3_code_prefix)
print(f"S3 Code or Model tar ball uploaded to --- > {code_artifact}")
model = Model(image_uri=image_uri, model_data=code_artifact, role=role)
4.2 Create SageMaker endpoint¶
You need to specify the instance to use and endpoint names
instance_type = "ml.g5.12xlarge"
endpoint_name = sagemaker.utils.name_from_base("lmi-model")
model.deploy(initial_instance_count=1,
instance_type=instance_type,
endpoint_name=endpoint_name,
# container_startup_health_check_timeout=3600
)
# our requests and responses will be in json format so we specify the serializer and the deserializer
predictor = sagemaker.Predictor(
endpoint_name=endpoint_name,
sagemaker_session=sess,
serializer=serializers.JSONSerializer(),
)
Step 5: Test and benchmark the inference¶
Since we've changed the input preprocessing, the following will no longer work since the "inputs" field is no longer recognized:
predictor.predict(
{"inputs": "Large model inference is", "parameters": {}}
)
But this will work:
predictor.predict(
{"prompt": "Large model inference is", "parameters": {}}
)
Notice that the output format looks different compared to the output format in an example without customized postprocessing because we changed the output formatter.
Clean up the environment¶
sess.delete_endpoint(endpoint_name)
sess.delete_endpoint_config(endpoint_name)
model.delete_model()