47

Mule 3.9: Using Cache Scope to Avoid Parallel Processing of the same Mule Applic...

 5 years ago
source link: https://www.tuicool.com/articles/hit/EFFziuA
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Mule 3.9: Using Cache Scope to Avoid Parallel Processing of the same Mule Application

DZone's Guide to

Mule 3.9: Using Cache Scope to Avoid Parallel Processing of the same Mule Application

In this article, I will explain how to implement a logic in Mule ESB to avoid parallel processing or simultaneously running of the same mule application.

Free Resource

Join the DZone community and get the full member experience.

In this article, I will explain how to implement a logic in Mule ESB that will avoid parallel processing or simultaneously running of the same mule application.

amAvE33.png!web

This Mule Application is triggered via HTTP GET Request. Once there's a new request, it will create a variable #[flowVars.correlationId],  which the value came from #[message.rootId].  This ID is a unique identifier of a particular Mule process and in this application, it will be stored inside the Cache Scope as a payload   using the Transform Message { correlationId: flowVars.correlation } (Map Object Data Type). Then after that scope, we have a Choice Router that will determine if there's an existing running process or not by checking if the  #[payload.correlationId == flowVars.correlationId] ( TRUE means there's no cached data yet) response an HTTP status 200, if FALSE return HTTP status 409 Conflict.

Take note that the #[ payload.correlationId]  value will always come from the Cache Scope(Object-Store) if there's a cached data, otherwise, it will cache and return the #[flowVars.correlationId] value.

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting" xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns:db="http://www.mulesoft.org/schema/mule/db"
xmlns:file="http://www.mulesoft.org/schema/mule/file"
xmlns:dw="http://www.mulesoft.org/schema/mule/ee/dw"
xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/ee/dw http://www.mulesoft.org/schema/mule/ee/dw/current/dw.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd">


    <ee:object-store-caching-strategy name="Caching_Strategy" doc:name="Caching Strategy">
        <managed-store storeName="correlation_id_store" maxEntries="-1" entryTTL="18000000" expirationInterval="18000000"/>
    </ee:object-store-caching-strategy>
    <http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8081" basePath="/api" doc:name="HTTP Listener Configuration"/>

    <flow name="pf-avoid-parallel-process">
        <http:listener config-ref="HTTP_Listener_Configuration" path="/process" allowedMethods="GET" doc:name="HTTP"/>
        <set-variable variableName="correlationId" value="#[message.rootId]" doc:name="Variable Set correlationId"/>
        <set-payload value=""Caching Correlation Id"" doc:name="Set Payload As String"/>
        <ee:cache doc:name="Cache" cachingStrategy-ref="Caching_Strategy">
            <dw:transform-message doc:name="Transform Message">
                <dw:set-payload><![CDATA[%dw 1.0
%output application/java
---
{
correlationId: flowVars.correlationId
}]]></dw:set-payload>
            </dw:transform-message>
        </ee:cache>
        <choice doc:name="Choice correlationId?">
            <when expression="#[payload.correlationId == flowVars.correlationId]">
                <scripting:transformer doc:name="Groovy Sleep(15000)">
                    <scripting:script engine="Groovy"><![CDATA[sleep(15000)]]></scripting:script>
                </scripting:transformer>
            <ee:invalidate-cache doc:name="Invalidate Cache" cachingStrategy-ref="Caching_Strategy"/>
                <dw:transform-message doc:name="Transform Message 200 JSON Response">
                    <dw:set-payload><![CDATA[%dw 1.0
%output application/json
---
{
correlationId: flowVars.correlationId,
status: "COMPLETED"
}]]></dw:set-payload>
                </dw:transform-message>
            </when>
            <otherwise>
                <logger  level="INFO" doc:name="Logger"/>
                <dw:transform-message doc:name="Transform Message 409 JSON Reponse">
                    <dw:set-payload><![CDATA[%dw 1.0
%output application/json
---
{
status: "Error",
message: "There's already running process with correlationId " ++ payload.correlationId ++ "."
}]]></dw:set-payload>
                    <dw:set-property propertyName="http.status"><![CDATA[%dw 1.0
%output application/java
---
409]]></dw:set-property>
                </dw:transform-message>
            </otherwise>
        </choice>
        <logger message="#[flowVars.correlationId] - End Of Process" level="INFO" doc:name="Logger"/>
    </flow>

</mule>

OBJECT-STORE-CACHING-STRATEGY(Cache Scope Configuration) :

managed-store

An object store that saves cached responses in a place defined by ListableObjectStore:

  • storeName - The name of the entry in the Object Store.
  • maxEntries - Maximum number of entries (i.e. cached responses)
  • entryTTL - The “lifespan” of a cached response within the object store (i.e. time to live)
  • expirationInterval - The expiration interval between polls for expired cached responses

For more information regarding this configuration, see https://docs.mulesoft.com/mule-user-guide/v/3.9/cache-scope#configobjstore

<ee:object-store-caching-strategy name="Caching_Strategy" doc:name="Caching Strategy">
  <managed-store storeName="correlation_id_store" 
  maxEntries="-1" 
  entryTTL="18000000" 
  expirationInterval="18000000"/>
</ee:object-store-caching-strategy>

CACHE SCOPE:This scope will always store the returned payload from the Transform Message which is { correlationId: flowVars.correlationId }. Also, note that the payload before the Cache Scope should be a STRING Data Type otherwise it will not work.

<set-payload value=""Caching Correlation Id"" doc:name="Set Payload As String"/>
<ee:cache doc:name="Cache" cachingStrategy-ref="Caching_Strategy">
    <dw:transform-message doc:name="Transform Message">
        <dw:set-payload><![CDATA[%dw 1.0
            %output application/java
            ---
            {
                correlationId: flowVars.correlationId
            }]]>
        </dw:set-payload>
    </dw:transform-message>
</ee:cache>

INVALIDATE-CACHE:Invalidate the cache means clearing the cached data exists in the object store.

<ee:invalidate-cache doc:name="Invalidate Cache" 
 cachingStrategy-ref="Caching_Strategy"/>

CHOICE (Choice correlationId?): A Choice Router with a condition #[payload.correlationId == flowVars.correlationId] if validated as TRUE , it will execute the Groovy Sleep(15000), Invalidate Cache and response a HTTP Status 200 OK with JSON payload indicating that the process completed. if FALSE , return HTTP Status 409 Conflict with JSON payload indicating that there is a currently running process.

<choice doc:name="Choice correlationId?">
     <when expression="#[payload.correlationId == flowVars.correlationId]">
           <scripting:transformer doc:name="Groovy Sleep(15000)">
                <scripting:script engine="Groovy"><![CDATA[sleep(15000)]]></scripting:script>
           </scripting:transformer>
           <ee:invalidate-cache doc:name="Invalidate Cache" cachingStrategy-ref="Caching_Strategy"/>
           <dw:transform-message doc:name="Transform Message 200 JSON Response">
               <dw:set-payload><![CDATA[%dw 1.0
                    %output application/json
                    ---
                    {
                        correlationId: flowVars.correlationId,
                        status: "COMPLETED"
                    }]]>
             </dw:set-payload>
            </dw:transform-message>
            </when>
            <otherwise>
                <dw:transform-message doc:name="Transform Message 409 JSON Reponse">
                    <dw:set-payload><![CDATA[%dw 1.0
                      %output application/json
                      ---
                      {
                          status: "Error",
                          message: "There's already running process with correlationId " ++ payload.correlationId ++ "."
                      }]]>
                  </dw:set-payload>
                    <dw:set-property propertyName="http.status"><![CDATA[%dw 1.0
                      %output application/java
                      ---
                      409]]>
                  </dw:set-property>
                </dw:transform-message>
             </otherwise>
</choice>

TEST RESULT:

Triggered the API on the Postman > First Tab . The  Groovy Transformer with Sleep(15000) code will be executed causing a 15 seconds hold of the process.

7JfQz26.png!web

Retrigger the API on the Postman > Second Tab resulted to a HTTP 409 Conflict with JSON response message indicating that there is a currently running process.

zI7Rni6.png!web

Retrigger the API on the Postman > Second Tab after 15 seconds resulted to a HTTP 200 OK  with JSON response message indicating that the process completes.

BZFVveI.png!web

Sync, automate, and notify lead to customer changes across marketing, CRM, and messaging apps in real-time with the Cloud Elements eventing framework. Learn more .


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK