

JMS Throttling With Camunda Application
source link: https://dzone.com/articles/jms-throttling-with-camunda-application
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

IBM MQ exposes the JMS interface to connect with Camunda Spring boot using the mq-jms-spring-boot-starter
library as a dependency in the POM.xml file. The message being delivered to Camunda is modeled as XML- SOAP format and posted on queue through MQJ explorer. Here, MQJ explorer acts as a client to IBM MQ. We will have methods to start and stop the JMS listener, as well as methods to know the status and setting maxConnectionSize.
Prerequisites
- Eclipse (any version) with Maven capabilities
- Java 8+
- IBM MQ and MQJ Explorer
- Camunda
Installing Eclipse-IDE on Windows
Creating a Maven Project in Eclipse IDE
- Open the Eclipse IDE.
- Go to File > New > Project.
- Go to Maven -> Maven Project and click Next.
- Select your workspace location and click Next.
- Select quick start maven archetype and click Next.
- Enter Group Id, Artifact Id, and package name.
Group Id: Fill in a groupId for the project of your choice.
Artifact Id: Fill artifactId for the project of your choice
Package: Java package structure of your choice
The above process will create a project structure like below:
7. Create a package like com.example.demo.delegate
under src/main/java
folder and create a source folder src/main/resources
folder.
8. Place the CamundaApplication.java
file in the com.example.demo
package.
package com.example.demo;
import java.io.ByteArrayInputStream;
import java.util.HashMap;
import java.util.Map;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamReader;
import org.camunda.bpm.engine.ProcessEngine;
import org.camunda.bpm.engine.ProcessEngines;
import org.camunda.bpm.engine.RuntimeService;
import org.camunda.bpm.engine.variable.Variables;
import org.camunda.bpm.engine.variable.value.ObjectValue;
import org.camunda.bpm.spring.boot.starter.annotation.EnableProcessApplication;
import org.camunda.bpm.spring.boot.starter.event.PostDeployEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jms.JmsAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.event.EventListener;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
@ComponentScan("com.example.demo")
@SpringBootApplication(scanBasePackages = "com.example.demo.*",exclude={JmsAutoConfiguration.class})
@EnableProcessApplication()
@EnableJms
public class CamundaApplication {
Object lock = new Object();
boolean firstTimeP = true;
@Autowired
ProcessEngine engine = ProcessEngines.getDefaultProcessEngine();
public static void main(String[] args) {
SpringApplication.run(CamundaApplication.class, args);
}
String xmlRequestPublishBilling = null;
@EventListener
public void onPostDeploy(PostDeployEvent event) {
synchronized (lock) {
lock.notifyAll();
firstTimeP = false;
}
}
@JmsListener(destination = "${ibm.mq.queue.publishBilling}")
public void listenerPublish(Object message) {
if (firstTimeP) {
synchronized (lock) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
try {
if (message instanceof com.ibm.jms.JMSBytesMessage) {
com.ibm.jms.JMSBytesMessage mess = (com.ibm.jms.JMSBytesMessage) message;
mess.acknowledge();
byte[] payload = new byte[(int) mess.getBodyLength()];
mess.readBytes(payload);
xmlRequestPublishBilling = new String(payload);
} else {
com.ibm.jms.JMSTextMessage mess = (com.ibm.jms.JMSTextMessage) message;
xmlRequestPublishBilling = mess.getText();
mess.acknowledge();
}
String transactionId = null;
byte[] byteArray;
byteArray = xmlRequestPublishBilling.getBytes("UTF-8");
ByteArrayInputStream inputStream = new ByteArrayInputStream(byteArray);
XMLInputFactory inputFactory = XMLInputFactory.newInstance();
XMLStreamReader streamReader = inputFactory.createXMLStreamReader(inputStream);
while (streamReader.hasNext()) {
// Move to next event
streamReader.next();
// Check if its 'START_ELEMENT'
if (streamReader.getEventType() == XMLStreamReader.START_ELEMENT) {
String tagName = streamReader.getLocalName();
if (tagName.equalsIgnoreCase("transactionId")) {
transactionId = streamReader.getElementText();
}
}
}
RuntimeService runtimeService = engine.getRuntimeService();
Map<String, Object> processVariableMap = new HashMap<String, Object>();
ObjectValue xmlValue = Variables.objectValue(xmlRequestPublishBilling)
.serializationDataFormat("application/xml").create();
processVariableMap.put(Constant.REQUEST, xmlValue);
runtimeService.createMessageCorrelation("RecpSrvc_Billing_Initiator_Message")
.processInstanceBusinessKey(transactionId).setVariables(processVariableMap).correlateStartMessage();
} catch (Exception e) {
e.printStackTrace();
}
}
}
9. Add the Constant.java
file in the com.example.demo
package.
package com.example.demo;
public class Constant {
// ****** Error State Constants*****
public static final String REQUEST = "request";
}
10. Add O2JMSController.java
file in com.example.demo
package.
package com.example.demo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.config.JmsListenerEndpointRegistry;
import org.springframework.jms.listener.MessageListenerContainer;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQSimpleConnectionManager;
@RestController
@RequestMapping("/jms")
public class O2JMSController {
private static final Logger LOGGER = LoggerFactory.getLogger(O2JMSController.class);
@Autowired
JmsListenerEndpointRegistry jmsListenerEndpointRegistry;
static MQSimpleConnectionManager connectionMgr = null;
static int size = 40;
@RequestMapping(value = "/publishBilling/stop", method = RequestMethod.POST)
String stopPublishBilling(@RequestBody String command) {
MessageListenerContainer container = jmsListenerEndpointRegistry.getListenerContainer("PublishListener");
container.stop();
return "PublishBilling JMS Listener Stopped";
}
@RequestMapping(value = "/publishBilling/start", method = RequestMethod.POST)
String startPublishBilling(@RequestBody String command) {
MessageListenerContainer container = jmsListenerEndpointRegistry.getListenerContainer("PublishListener");
container.start();
return "PublishBilling JMS Listener Started";
}
@RequestMapping(value = "/publishBilling/status", method = RequestMethod.POST)
String getStatusPublishBilling(@RequestBody String command) {
MessageListenerContainer container = jmsListenerEndpointRegistry.getListenerContainer("PublishListener");
boolean status = container.isRunning();
if (status) {
return "PublishBilling JMS Listener Running";
} else {
return "PublishBilling JMS Listener not Running";
}
}
@RequestMapping(value = "/maxConnectionSize", method = RequestMethod.POST)
String setMaxConnections(@RequestBody String connectionSize) {
size = Integer.parseInt(connectionSize);
stopPublishBilling(null);
if (connectionMgr == null) {
connectionMgr = new MQSimpleConnectionManager();
}
connectionMgr.setActive(MQSimpleConnectionManager.MODE_AUTO);
connectionMgr.setMaxConnections(size);
MQEnvironment.setDefaultConnectionManager(connectionMgr);
startPublishBilling(null);
return "maxConnections is set :: " + size;
}
@RequestMapping(value = "/maxConnectionSize", method = RequestMethod.GET)
String getMaxConnections() {
// int connectionSize = 0;
return "maxConnection size is :: " + size;
}
}
11. Add the BillingDelegate.java
file in com.example.demo.delegate
package.
package com.example.dem.delegate;
import org.camunda.bpm.engine.delegate.DelegateExecution;
import org.camunda.bpm.engine.delegate.JavaDelegate;
import org.camunda.bpm.engine.variable.value.ObjectValue;
import org.springframework.stereotype.Component;
import com.example.demo.Constant;
@Component("BillingDelegate")
public class BillingDelegate implements JavaDelegate {
public void execute(DelegateExecution execution) {
ObjectValue xmlRequestObj = (ObjectValue) execution.getVariableTyped(Constant.REQUEST);
String xmlRequest = (String) xmlRequestObj.getValue().toString();
System.out.println("Request :" + xmlRequest);
}
}
12. Add application.properties
, application.yaml
, and RcsBillingEventInitiator.bpmn
in /src/main/resources
folder.
application.properties
ibm.mq.queueManager=M0DCRMT3
ibm.mq.channel=BOSS.SVRCONN
ibm.mq.connName=iv4239.uname.telecom.co.nz(1434)
ibm.mq.user=
ibm.mq.password=
ibm.mq.pool.enabled=true
ibm.mq.pool.maxConnections=40
ibm.mq.pool.maxSessionsPerConnection=500
ibm.mq.queue.publishBilling=DEV/BOSS/PUBLISH_BILLING_EVENT
server.port=8080
application.yaml
spring.h2.console.enabled: true
spring.datasource:
url: jdbc:h2:./camunda-h2-database;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE;AUTO_SERVER=TRUE;
username: sa
password: sa
camunda.bpm:
admin-user:
id: demo
password: demo
firstName: Demo
lastName: Demo
filter:
create: All Tasks
RcsBillingEventInitiator.bpmn
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:camunda="http://camunda.org/schema/1.0/bpmn" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" id="Definitions_0xx4klu" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="4.4.0">
<bpmn:collaboration id="Collaboration_1g0hs7s">
<bpmn:participant id="RCS_BillingEvent_Initiator" name="Receptor Service Billing Event Initiator" processRef="RCS_BillingEvent_Initiator_Group" />
</bpmn:collaboration>
<bpmn:process id="RCS_BillingEvent_Initiator_Group" isExecutable="true">
<bpmn:startEvent id="RecpSrvc_Billing_Initiator_Start" name="RecpSrvc Billing Initiator Start">
<bpmn:outgoing>Flow_1jexk4p</bpmn:outgoing>
<bpmn:messageEventDefinition id="MessageEventDefinition_0qqrwpx" messageRef="Message_1suodic" />
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_1jexk4p" sourceRef="RecpSrvc_Billing_Initiator_Start" targetRef="RecpSrvc_Billing_Initiator_DuplicateEventValidation" />
<bpmn:endEvent id="Event_1tqof64" name="End Eent">
<bpmn:incoming>Flow_06bsz4a</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_06bsz4a" sourceRef="RecpSrvc_Billing_Initiator_DuplicateEventValidation" targetRef="Event_1tqof64" />
<bpmn:serviceTask id="RecpSrvc_Billing_Initiator_DuplicateEventValidation" name="Validating the Received message" camunda:delegateExpression="#{BillingDelegate}">
<bpmn:incoming>Flow_1jexk4p</bpmn:incoming>
<bpmn:outgoing>Flow_06bsz4a</bpmn:outgoing>
</bpmn:serviceTask>
<bpmn:textAnnotation id="TextAnnotation_0kpt6hp">
<bpmn:text>Receives publish billing event from MQ series</bpmn:text>
</bpmn:textAnnotation>
<bpmn:association id="Association_0ee69pk" sourceRef="RecpSrvc_Billing_Initiator_Start" targetRef="TextAnnotation_0kpt6hp" />
<bpmn:textAnnotation id="TextAnnotation_1k38r2c">
<bpmn:text>Validating the received XML</bpmn:text>
</bpmn:textAnnotation>
<bpmn:association id="Association_1kyku2l" sourceRef="RecpSrvc_Billing_Initiator_DuplicateEventValidation" targetRef="TextAnnotation_1k38r2c" />
<bpmn:textAnnotation id="TextAnnotation_1qzlsj7">
<bpmn:text>Ending the flow</bpmn:text>
</bpmn:textAnnotation>
<bpmn:association id="Association_1my84xb" sourceRef="Event_1tqof64" targetRef="TextAnnotation_1qzlsj7" />
</bpmn:process>
<bpmn:message id="Message_1suodic" name="RecpSrvc_Billing_Initiator_Message" />
<bpmn:message id="Message_1vnhkcs" name="RecpSrvc_Billing_Pre_Processor_Msg" />
<bpmn:error id="Error_14k81br" name="BPMN_ERROR" errorCode="BPMN_ERROR" />
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Collaboration_1g0hs7s">
<bpmndi:BPMNShape id="Participant_07448el_di" bpmnElement="RCS_BillingEvent_Initiator" isHorizontal="true">
<dc:Bounds x="160" y="80" width="670" height="350" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="TextAnnotation_1k38r2c_di" bpmnElement="TextAnnotation_1k38r2c">
<dc:Bounds x="530" y="150" width="100" height="40" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="TextAnnotation_1qzlsj7_di" bpmnElement="TextAnnotation_1qzlsj7">
<dc:Bounds x="710" y="220" width="100" height="30" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_1jexk4p_di" bpmnElement="Flow_1jexk4p">
<di:waypoint x="246" y="327" />
<di:waypoint x="410" y="327" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_06bsz4a_di" bpmnElement="Flow_06bsz4a">
<di:waypoint x="510" y="327" />
<di:waypoint x="672" y="327" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNShape id="Event_1btznrj_di" bpmnElement="RecpSrvc_Billing_Initiator_Start">
<dc:Bounds x="210" y="309" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="190" y="352" width="81" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="TextAnnotation_0kpt6hp_di" bpmnElement="TextAnnotation_0kpt6hp">
<dc:Bounds x="230" y="150" width="100" height="68" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1tqof64_di" bpmnElement="Event_1tqof64">
<dc:Bounds x="672" y="309" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="667" y="352" width="47" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1efnk0s_di" bpmnElement="RecpSrvc_Billing_Initiator_DuplicateEventValidation">
<dc:Bounds x="410" y="287" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Association_0ee69pk_di" bpmnElement="Association_0ee69pk">
<di:waypoint x="234" y="310" />
<di:waypoint x="263" y="218" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Association_1kyku2l_di" bpmnElement="Association_1kyku2l">
<di:waypoint x="484" y="287" />
<di:waypoint x="541" y="190" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Association_1my84xb_di" bpmnElement="Association_1my84xb">
<di:waypoint x="701" y="313" />
<di:waypoint x="749" y="250" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
13. Replace the pom.xml
with the below content.
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>JMSCamundaThrottleDemo</groupId>
<artifactId>JMSCamundaThrottleDemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>JMSCamundaThrottleDemo</name>
<description>JMSCamundaThrottleDemo</description>
<properties>
<camunda.version>7.14.0</camunda.version>
<cxf.version>3.3.6</cxf.version>
<camundaSpringBoot.version>7.14.0</camundaSpringBoot.version>
<springBoot.version>2.2.5.RELEASE</springBoot.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<version.java>1.8</version.java>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<failOnMissingWebXml>false</failOnMissingWebXml>
</properties>
<dependencies>
<dependency>
<groupId>org.camunda.bpm.springboot</groupId>
<artifactId>camunda-bpm-spring-boot-starter-webapp</artifactId>
<version>7.16.0-alpha2</version>
</dependency>
<dependency>
<groupId>org.camunda.bpm.springboot</groupId>
<artifactId>camunda-bpm-spring-boot-starter-rest</artifactId>
<version>7.16.0-alpha2</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.3.8</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>org.camunda.spin</groupId>
<artifactId>camunda-spin-dataformat-all</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.camunda.bpm</groupId>
<artifactId>camunda-engine-plugin-spin</artifactId>
<version>7.16.0-alpha2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>org.camunda.spin</groupId>
<artifactId>camunda-spin-core</artifactId>
<version>1.10.1</version>
</dependency>
<!-- PostgreSQL -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.22</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>mq-jms-spring-boot-starter</artifactId>
<version>2.4.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.camunda.bpm/camunda-engine -->
<dependency>
<groupId>io.jaegertracing</groupId>
<artifactId>jaeger-client</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-spring-jaeger-cloud-starter</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.200</version>
</dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>6.6</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>5.3.8</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>camunda-bpm-nexus</id>
<name>Camunda Maven Repository</name>
<url>https://app.camunda.com/nexus/content/groups/public</url>
</repository>
<!-- enable this for EE dependencies (requires credentials in ~/.m2/settings.xml)-->
<!-- <repository>
<id>camunda-bpm-nexus-ee</id>
<name>Camunda Enterprise Maven Repository</name>
<url>https://app.camunda.com/nexus/content/repositories/camunda-bpm</url>
</repository> -->
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.5.1</version>
</plugin>
<plugin>
<groupId>com.google.cloud.tools</groupId>
<artifactId>jib-maven-plugin</artifactId>
<version>0.4.0</version>
</plugin>
</plugins>
</build>
</project>
Testing
- Run the Camunda Application.
- Starting the JMS listener:
3. Stopping the JMS listener:
4. Knowing the status:
5. Setting maxConnectionSize:
This concludes what we have learned about JMS throttling with Camunda Application.
Recommend
-
66
SpringJMS简介我们都知道使用Spring可以简化我们的开发,同样的使用Spring也可以集成JMS来连接ActiveMQ,这里说明一下几个需要用到的类:1.首先是ConnectionFactory的实现类,Spring封装了两个连接工厂的实现类。因为JmsTemplate每次发消息都会重新创建连接、会话...
-
72
保证消息传送 保证消息传送有3个主要部分:消息自主性,存储并转发以及底层消息确认,下面具体看一下这些概念; 1.消息自主性 消息是自包含的自主性实体,在设计分布式消息应用程序时,要将此作为头条法则;...
-
105
Extending JUEL with custom functions in Camunda DMN engineA few weeks ago I was trying to implement custom functions in Camunda DMN engine. I wanted to extend the expression language Java Unified Exp...
-
50
Mule 4: JMS Pub and Sub With Transformation DZone's Guide to Mule 4: JMS Pub and Sub With Transformation Take a look at a tutorial that explains how to conne...
-
11
开源软件提供商Camunda 宣布,已获得 8200 万欧元(约合 1 亿美元)的B轮融资。Camunda公司的Camunda BPM 是一个基于Java开源的工作流和决策自动化平台,遵守Apache开源协议。采用Camunda的客户包...
-
17
Zeebe - Workflow Engine for Microservices Orchestration Zeebe provides visibility into and control over business processes that span multiple microservices. It is the engine that powers
-
12
Camunda流程引擎 – 邵晨峰的博客 主页 Camunda流程引...
-
11
Camunda流程引擎 - paulwong - BlogJava paulwong My Links Blog Stats Posts - 1094 Stories - 10 Comments - 108
-
7
Muniyappan Marasamy July 6, 2022 2 minute...
-
5
How to create a Java JMS Client for Artemis 24 January 2023 by F.Marchioni
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK