Async Processor Library

Creating an Async Service using the Async Processor library

This section will guide you on creating an Async Service using the Async Processor library. This service will be capable of processing asynchronous requests efficiently. Please follow these steps:

Step 1: Install Async Processor

Before you can create an Async Service, you need to install the Async Processor library, along with the SQS (Simple Queue Service) support:

pip install "async_processor[sqs]"

Step 2: Write the Processor

from async_processor import (  
    InputMessage,  
    Processor,  
)

class MultiplicationProcessor(Processor):  
    def process(self, input_message: InputMessage) -> int:  
        body = input_message.body  
        return body["x"] * body["y"]

app = MultiplicationProcessor().build_app()

To create an Async Service, you must define a Processor that handles processing asynchronous requests:

  1. Inherit Processor Class:
    • Create a new class (e.g., MultiplicationProcessor) that inherits from the provided Processor class. This class handles async request processing.
  2. Override process Method:
    • Inside your custom processor class, implement the process method. It specifies how async requests should be processed. This method takes an InputMessage type argument and returns the processed result.
    • In the example, it multiplies two numbers (x and y).
  3. Build the App:
    • Create an instance of your custom processor class (e.g., MultiplicationProcessor().build_app()). This instance represents your async service.
    • Building the app configures and sets up endpoints for your Async Service.

Here's an example of how to write a processor that multiplies two numbers:

Step 3: Run the Async Service

You can use Gunicorn and Uvicorn as the worker class to run your Async Service. Execute the following command in your terminal:

gunicorn app:app --workers 1 --worker-class uvicorn.workers.UvicornWorker --bind 127.0.0.1:8000

This command starts the service and binds it to http://localhost:8000. You should see an output similar to the following indicating that the service is running:

[2023-08-17 16:10:33 +0530] [78736] [INFO] Starting gunicorn 21.2.0  
[2023-08-17 16:10:33 +0530] [78736] [INFO] Listening at: <http://127.0.0.1:8000> (78736)  
[2023-08-17 16:10:33 +0530] [78736] [INFO] Using worker: uvicorn.workers.UvicornWorker  
[2023-08-17 16:10:33 +0530] [78738] [INFO] Booting worker with pid: 78738  
[2023-08-17 16:10:33 +0530] [78738] [INFO] Started server process [78738]  
[2023-08-17 16:10:33 +0530] [78738] [INFO] Waiting for application startup.  
...  
[2023-08-17 16:10:34 +0530] [78738] [INFO] Application startup complete.

Step 4: Send a Process Request

To interact with your deployed Async Service and send a process request, follow these steps:

4.1 Create a Python Script for Sending Asynchronous Requests

import json  
import uuid  
from async_processor import InputMessage, OutputMessage, ProcessStatus  
import boto3

def send_request(input_sqs_url: str, output_sqs_url: str):  
    sqs = boto3.client("sqs")  
    request_id = str(uuid.uuid4())

sqs.send_message(
    QueueUrl=input_sqs_url,
    MessageBody=json.dumps(
        InputMessage(request_id=request_id, body={"x": 1, "y": 2}).dict()
    ),
)

while True:
    response = sqs.receive_message(
        QueueUrl=output_sqs_url, MaxNumberOfMessages=1, WaitTimeSeconds=19
    )
    if "Messages" not in response:
        continue
    msg = response["Messages"][0]
    response = OutputMessage(**json.loads(msg["Body"]))

    if ProcessStatus[response.status] is not ProcessStatus.SUCCESS:
        raise Exception(f"Processing failed: {response.error}")
    print(response)
    break

To send a process request using a Python script (e.g., send_async_request.py) with AWS SQS integration:

  • Initialize AWS SQS Client: An AWS SQS client is initialized using boto3.client("sqs").
  • Generate Request ID: A unique request ID is generated using uuid.uuid4(). This ID helps identify each asynchronous request.
  • Send Input Message: The code uses sqs.send_message to send the input message to the specified input SQS queue (input_sqs_url). The message body is a JSON representation of an InputMessage object, including the request ID and input data ({"x": 1, "y": 2}).
  • Wait and Process Response: A loop is started to wait for a response from the output SQS queue (output_sqs_url). sqs.receive_message is used to receive a message, with the WaitTimeSeconds parameter controlling the wait time. If no messages are received, the loop continues to wait.
  • Process Response Message: Once a message is received, it is parsed as an OutputMessage object. The code checks the processing status, and if it's unsuccessful, it raises an exception with the error message.

4.2 Run the Python Script

Execute the Python script to send an asynchronous process request to your service:

python send_async_request.py

You will receive an output similar to the following, indicating the status and result of the asynchronous request:

request_id='46a4ebc6-afdb-46a0-8587-ba29abf0f0d4' status='SUCCESS' body=2 error=None

Congratulations! You have successfully created an asynchronous service using the Async Processor library.

Deploy your Async Service

Follow the following guide to learn how to Deploy the Async Service you just created: In the following guide