Using an Azure APIM Scatter-Gather policy with a Mapping Function

This is a follow up from a previous blog “Azure APIM Scatter-Gather Pattern Policy” where I wrote about using the Wait policy to create a scatter-gather pattern.  A few colleges were asking about being able to map the inbound request to the different schemas required by each of the Microservices.

A high level design is shown below using two “Wait” polices and an Azure Function for the mapping. The first policy ‘Translation’ sends the request to the mapping function and when completed the second policy ‘Scatter’ is executed to send the mapped request to each of the Microservices.

image

The internals of the Azure Function that maps the incoming request is shown below as an example. Here I am using a route on the supplier name and a simple If-Then statement to select which static translator method to call.

public static class MappingService
{
[FunctionName("DataMapping")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "post", Route = "Pricing/{suppliername}")]
HttpRequest req,
string suppliername,
ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");

string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
dynamic response = null;
if (suppliername.Equals("Supplier1", StringComparison.OrdinalIgnoreCase))
response = Translator.TranslateSupplier1(requestBody);
else if (suppliername.Equals("Supplier2", StringComparison.OrdinalIgnoreCase))
response = Translator.TranslateSupplier2(requestBody);

return response != null
? (ActionResult)new OkObjectResult(response)
: new BadRequestObjectResult("Invalid message.");
}
}

Below is the code for the Translation Policy which calls the two Azure function resources in parallel that accepts the inbound price request message. The function access code is stored as a named-value pair in APIM called “functionapkey”.

<!-- Call the mapping service -->
<wait for="all">
<send-request mode="copy" response-variable-name="res_SupplierMap1" timeout="120" ignore-error="false">
<set-url>https://mapsvc.azurewebsites.net/api/Pricing/supplier1?code={{funcmapkey}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
<send-request mode="copy" response-variable-name="res_SupplierMap2" timeout="120" ignore-error="false">
<set-url>https://mapsvc.azurewebsites.net/api/Pricing/supplier2?code={{funcmapkey}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
</wait>
 

The code for the Scatter Policy is shown below which is similar to the original blog post. However it uses the mapped outputs from the Translation policy that are stored in the res_SupplerMap1 and res_SupplerMap2 context variables instead.

<!-- Call the pricing services -->
<wait for="all">
<send-request mode="copy" response-variable-name="response_1" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_1}}</set-url>
<set-method>POST</set-method>
<set-body>@(((IResponse)context.Variables["res_SupplierMap1"]).Body.As<string>())</set-body>
</send-request>
<send-request mode="copy" response-variable-name="response_2" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_2}}</set-url>
<set-method>POST</set-method>
<set-body>@(((IResponse)context.Variables["res_SupplierMap2"]).Body.As<string>())</set-body>
</send-request>
</wait>

The last policy checks the status of each of the pricing services and returns the results as a composite message if there were no errors encountered. This is similar to the original blog post but instead of returning a JObject I am now returning a JArray collection.

<choose>
<when condition="@((int)((IResponse)context.Variables["response_1"]).StatusCode != 200)">
<return-response response-variable-name="response_1" />
</when>
<when condition="@((int)((IResponse)context.Variables["response_2"]).StatusCode != 200)">
<return-response response-variable-name="response_2" />
</when>
<otherwise>
<return-response>
<set-status code="200" reason="OK" />
<set-header name="Content-Type" exists-action="override">
<value>application/json</value>
</set-header>
<set-body>@{
JArray suppliers = new JArray();
suppliers.Add(((IResponse)context.Variables["response_1"]).Body.As<JObject>());
suppliers.Add(((IResponse)context.Variables["response_2"]).Body.As<JObject>());
return suppliers.ToString();
}</set-body>
</return-response>
</otherwise>
</choose>

The completed policy for the method looks like this below. Take note the request payload is stored in the variable named “requestPayload” initially to avoid locking body context in the <Wait> policies.

<policies>
<inbound>
<set-variable name="requestPayload" value="@(context.Request.Body.As&lt;string>(true))" />
<!-- Call the mapping service -->
<wait for="all">
<send-request mode="copy" response-variable-name="res_SupplierMap1" timeout="120" ignore-error="false">
<set-url>https://mapsvc.azurewebsites.net/api/Pricing/supplier1?code={{funcmapkey}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
<send-request mode="copy" response-variable-name="res_SupplierMap2" timeout="120" ignore-error="false">
<set-url>https://mapsvc.azurewebsites.net/api/Pricing/supplier2?code={{funcmapkey}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
</wait>
<!-- Call the pricing services -->
<wait for="all">
<send-request mode="copy" response-variable-name="response_1" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_1}}</set-url>
<set-method>POST</set-method>
<set-body>@(((IResponse)context.Variables["res_SupplierMap1"]).Body.As<string>())</set-body>
</send-request>
<send-request mode="copy" response-variable-name="response_2" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_2}}</set-url>
<set-method>POST</set-method>
<set-body>@(((IResponse)context.Variables["res_SupplierMap2"]).Body.As<string>())</set-body>
</send-request>
</wait>
<choose>
<when condition="@((int)((IResponse)context.Variables["response_1"]).StatusCode != 200)">
<return-response response-variable-name="response_1" />
</when>
<when condition="@((int)((IResponse)context.Variables["response_2"]).StatusCode != 200)">
<return-response response-variable-name="response_2" />
</when>
<otherwise>
<return-response>
<set-status code="200" reason="OK" />
<set-header name="Content-Type" exists-action="override">
<value>application/json</value>
</set-header>
<set-body>@{
JArray suppliers = new JArray();
suppliers.Add(((IResponse)context.Variables["response_1"]).Body.As<JObject>());
suppliers.Add(((IResponse)context.Variables["response_2"]).Body.As<JObject>());
return suppliers.ToString();
}</set-body>
</return-response>
</otherwise>
</choose>
</inbound>
<backend>
<base />
</backend>
<outbound>
<base />
</outbound>
<on-error>
<base />
</on-error>
</policies>

Using the tracing feature in APIM, you can see the initial price request message below. This will be sent to the mapping function Pricing method.

 image

Below is the trace output from APIM showing the two different messages returned from mapping function and assigned to the res_Supplier variables.

image

Below is the composite message returned from APIM as an JSON array containing each supplier.

image

In conclusion, using the two <Wait> polices to send multiple requests in parallel yields a request/response latency of around 200ms on average in this scenario. Also instead of using an Azure Function to map the incoming request, you could replace it with a couple of “Transformation” policies.

Enjoy…

One Reply to “Using an Azure APIM Scatter-Gather policy with a Mapping Function”

Leave a Reply

Your email address will not be published. Required fields are marked *