Creating an Async Service
How to Create an Async Service
This section will guide you on creating an Async Service using the Async Processor library. This service will be capable of processing asynchronous requests efficiently. Please follow these steps:
Step 1: Install Async Processor
Before you can create an Async Service, you need to install the Async Processor library, along with the SQS (Simple Queue Service) support:
pip install "async_processor[sqs]"
Step 2: Write the Processor
from async_processor import (
InputMessage,
Processor,
)
class MultiplicationProcessor(Processor):
def process(self, input_message: InputMessage) -> int:
body = input_message.body
return body["x"] * body["y"]
app = MultiplicationProcessor().build_app()
To create an Async Service, you must define a Processor that handles processing asynchronous requests:
- Inherit Processor Class:
- Create a new class (e.g.,
MultiplicationProcessor
) that inherits from the providedProcessor
class. This class handles async request processing.
- Create a new class (e.g.,
- Override process Method:
- Inside your custom processor class, implement the process method. It specifies how async requests should be processed. This method takes an
InputMessage
type argument and returns the processed result. - In the example, it multiplies two numbers (x and y).
- Inside your custom processor class, implement the process method. It specifies how async requests should be processed. This method takes an
- Build the App:
- Create an instance of your custom processor class (e.g.,
MultiplicationProcessor().build_app()
). This instance represents your async service. - Building the app configures and sets up endpoints for your Async Service.
- Create an instance of your custom processor class (e.g.,
Here's an example of how to write a processor that multiplies two numbers:
Step 3: Run the Async Service
You can use Gunicorn and Uvicorn as the worker class to run your Async Service. Execute the following command in your terminal:
gunicorn app:app --workers 1 --worker-class uvicorn.workers.UvicornWorker --bind 127.0.0.1:8000
This command starts the service and binds it to http://localhost:8000. You should see an output similar to the following indicating that the service is running:
[2023-08-17 16:10:33 +0530] [78736] [INFO] Starting gunicorn 21.2.0
[2023-08-17 16:10:33 +0530] [78736] [INFO] Listening at: <http://127.0.0.1:8000> (78736)
[2023-08-17 16:10:33 +0530] [78736] [INFO] Using worker: uvicorn.workers.UvicornWorker
[2023-08-17 16:10:33 +0530] [78738] [INFO] Booting worker with pid: 78738
[2023-08-17 16:10:33 +0530] [78738] [INFO] Started server process [78738]
[2023-08-17 16:10:33 +0530] [78738] [INFO] Waiting for application startup.
...
[2023-08-17 16:10:34 +0530] [78738] [INFO] Application startup complete.
Step 4: Send a Process Request
To interact with your deployed Async Service and send a process request, follow these steps:
4.1 Create a Python Script for Sending Asynchronous Requests
import json
import uuid
from async_processor import InputMessage, OutputMessage, ProcessStatus
import boto3
def send_request(input_sqs_url: str, output_sqs_url: str):
sqs = boto3.client("sqs")
request_id = str(uuid.uuid4())
sqs.send_message(
QueueUrl=input_sqs_url,
MessageBody=json.dumps(
InputMessage(request_id=request_id, body={"x": 1, "y": 2}).dict()
),
)
while True:
response = sqs.receive_message(
QueueUrl=output_sqs_url, MaxNumberOfMessages=1, WaitTimeSeconds=19
)
if "Messages" not in response:
continue
msg = response["Messages"][0]
response = OutputMessage(**json.loads(msg["Body"]))
if ProcessStatus[response.status] is not ProcessStatus.SUCCESS:
raise Exception(f"Processing failed: {response.error}")
print(response)
break
To send a process request using a Python script (e.g., send_async_request.py) with AWS SQS integration:
- Initialize AWS SQS Client: An AWS SQS client is initialized using
boto3.client("sqs")
. - Generate Request ID: A unique request ID is generated using
uuid.uuid4()
. This ID helps identify each asynchronous request. - Send Input Message: The code uses
sqs.send_message
to send the input message to the specified input SQS queue (input_sqs_url
). The message body is a JSON representation of anInputMessage
object, including the request ID and input data ({"x": 1, "y": 2}
). - Wait and Process Response: A loop is started to wait for a response from the output SQS queue (
output_sqs_url
).sqs.receive_message
is used to receive a message, with theWaitTimeSeconds
parameter controlling the wait time. If no messages are received, the loop continues to wait. - Process Response Message: Once a message is received, it is parsed as an
OutputMessage
object. The code checks the processing status, and if it's unsuccessful, it raises an exception with the error message.
4.2 Run the Python Script
Execute the Python script to send an asynchronous process request to your service:
python send_async_request.py
You will receive an output similar to the following, indicating the status and result of the asynchronous request:
request_id='46a4ebc6-afdb-46a0-8587-ba29abf0f0d4' status='SUCCESS' body=2 error=None
Congratulations! You have successfully created an asynchronous service using the Async Processor library.
Updated 3 days ago