`

JMS Api Demo

    博客分类:
  • j2ee
阅读更多
jms.JMSFactory
package jms;
import javax.jms.TopicConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JMSFactory {
	private static ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
	
	public static TopicConnectionFactory getActiveMQConnectionFactory(){
		return activeMQConnectionFactory;
	}
}

jms.JMSMessageActor
package jms;
import javax.jms.*;
public abstract class JMSMessageActor {
	protected String name=null;
	protected String defaultQueueName="defalut-queue";
	protected String defaultTopicName="defalut-topic";
	protected final int  DESTIONATION_TYPE_TOPIC=1;
	protected final int DESTIONATION_TYPE_QUEUE=2;
	public JMSMessageActor(String name) {
		this.name = name;
	}
	public abstract Destination  getDestination();
	public Destination createDefaultDestination(int type){
		Destination dest=null;
		switch(type){
			case DESTIONATION_TYPE_TOPIC:
				dest=new Topic(){
					@Override
					public String getTopicName() throws JMSException {
						return defaultTopicName;
					}};break;
			case DESTIONATION_TYPE_QUEUE:
				dest=new Queue(){
					@Override
					public String getQueueName() throws JMSException {
						return defaultQueueName;
					}};
				break;
		}
		return dest;
	}
}

jms.JMSMessageConsumer
package jms;
import javax.jms.*;
public abstract class JMSMessageConsumer extends JMSMessageActor implements Runnable,MessageListener{
	public JMSMessageConsumer(String name){
		super(name);
	}
	@Override
	public void onMessage(Message message) {
		synchronized(JMSMessageConsumer.class){
			System.out.println("##### consumer "+ name +" receive message. #####");
			System.out.println(JMSUtil.formatMessage(message));
		}
	}
	@Override
	public void run() {
		try{			
			// get topic connect factory
			ConnectionFactory factory = JMSFactory.getActiveMQConnectionFactory();
			// create connection
			Connection connection = factory.createConnection();
			// create unique client id for the connection
			connection.setClientID("consumer_connection_"+name);
			// if the connection start method is not invoked , the consumer may be not receive the message
			connection.start();
			// create session
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			Destination destination=getDestination();
			// if the destination is an instance of Queue , 
			// it will receive the message from the queue,
			// in other words the message can be consumed one time by one consumer 
			// and the message is durable.
			// if the destination is an instance of Topic , 
			// the subscribers of the Topic can receive the message,
			// but the message is non durable.
			MessageConsumer consumer  =session.createConsumer(destination,null,true);
			// if the destination is an instance of Topic, 
			// specify the clientID of the connection
			// and create MessageConsumer like this,
			// the subscribers of the Topic can receive the message
			// and the message is durable.
			//consumer =session.createDurableSubscriber((Topic)destination, "durable topic", null, true);
			consumer.setMessageListener(this);
		}catch(Exception e){
			throw new RuntimeException(e);
		}
	}
}

jms.JMSMessageProducer
package jms;
import javax.jms.*;
public abstract class JMSMessageProducer extends JMSMessageActor implements Runnable{
	public JMSMessageProducer(String name){
		super(name);
	}
	@Override
	public void run() {
		try{
			// get topic connect factory
			ConnectionFactory factory = JMSFactory.getActiveMQConnectionFactory(); 
			// create connection
			Connection connection = factory.createConnection();
			// create session
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			Destination destination=getDestination();
			// create message producer
			MessageProducer producer = session.createProducer(destination);
			// create message
			MapMessage mapMessage=session.createMapMessage();
			mapMessage.setObjectProperty("lock", "key");
			mapMessage.setObjectProperty("vegetables", "greens");
			mapMessage.setObjectProperty("fruit", "apple");
			mapMessage.setObjectProperty("meat", "pork");
			
			// producer.setTimeToLive(1000);// set the message expiration time
			
			producer.send(mapMessage);// send message
			connection.close();
			System.out.println(name+" send message success!");
		}catch(Exception e){
			throw new RuntimeException(e);
		}
	}
}

jms.JMSMessageQueueReceiver
package jms;
import javax.jms.Destination;
public class JMSMessageQueueReceiver extends JMSMessageConsumer {
	public JMSMessageQueueReceiver(String name) {
		super(name);
	}
	@Override
	public Destination getDestination() {
		return createDefaultDestination(DESTIONATION_TYPE_QUEUE);
	}
}

jms.JMSMessageQueueSender
package jms;
import javax.jms.Destination;
public class JMSMessageQueueSender extends JMSMessageProducer {
	public JMSMessageQueueSender(String name) {
		super(name);
	}
	@Override
	public Destination getDestination() {
		return createDefaultDestination(DESTIONATION_TYPE_QUEUE);
	}
}

jms.JMSMessageTopicPublisher
package jms;
import javax.jms.Destination;
public class JMSMessageTopicPublisher extends JMSMessageProducer {
	public JMSMessageTopicPublisher(String name) {
		super(name);
	}
	@Override
	public Destination getDestination() {
		return createDefaultDestination(DESTIONATION_TYPE_TOPIC);
	}
}

jms.JMSMessageTopicSubscriber
package jms;
import javax.jms.Destination;
public class JMSMessageTopicSubscriber extends JMSMessageConsumer {
	public JMSMessageTopicSubscriber(String name) {
		super(name);
	}
	@Override
	public Destination getDestination() {
		return createDefaultDestination(DESTIONATION_TYPE_TOPIC);
	}
}

jms.JMSUtil
package jms;
import java.text.SimpleDateFormat;
import java.util.*;
import javax.jms.*;
public class JMSUtil {
	private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	@SuppressWarnings("unchecked")
	public static String formatMessage(Message message) {
		StringBuffer result=new StringBuffer();
		try {
			String correlationId=message.getJMSCorrelationID();
			int deliveryMode=message.getJMSDeliveryMode();
			long expiration=message.getJMSExpiration();
			String messageId=message.getJMSMessageID();
			int priority=message.getJMSPriority();
			long timestamp=message.getJMSTimestamp();
			String type=message.getJMSType();
			result.append("##### message property #####\n")
				  .append("correlationId : "+correlationId+"\n")
				  .append("deliveryMode : "+deliveryMode+"\n")
				  .append("expiration : "+expiration+"\n")
				  .append("messageId : "+messageId+"\n")
				  .append("priority : "+priority+"\n")
				  .append("timestamp : "+sdf.format(new Date(timestamp))+"\n")
				  .append("type : "+type+"\n");
			Enumeration<String> names=message.getPropertyNames();
			result.append("##### message content #####\n");
			while(names.hasMoreElements()){
				String name=names.nextElement();
				String value=message.getStringProperty(name);
				result.append(name +" : "+value+"\n");
			}
		} catch (JMSException e) {
			throw new RuntimeException(e);
		}
		return result.toString();
	}
}

jms.Main
package jms;
public class Main {
	public static void main(String[] args) throws InterruptedException {
		sendMesasge();
		Thread.sleep(2000);
		createMessageReceiver();
		
		Thread.sleep(3000);
		System.exit(0);
	}
	/* create message receiver */
	private static void createMessageReceiver(){
		// create queue message receiver
		for(int i=0;i<3;i++){
			Thread t=new Thread(new JMSMessageQueueReceiver("queue_receiver_"+i));
			t.start();
		}
		// create topic message subscriber
		for(int i=0;i<3;i++){
			Thread t=new Thread(new JMSMessageTopicSubscriber("topic_subscriber_"+i));
			t.start();
		}
	}
	/* send message */
	private static void sendMesasge(){
		// create queue message sender
		for(int i=0;i<3;i++){
			Thread t=new Thread(new JMSMessageQueueSender("queue_sender_"+i));
			t.start();
		}
		// create topic publisher
		for(int i=0;i<3;i++){
			Thread t=new Thread(new JMSMessageTopicPublisher("topic_publisher_"+i));
			t.start();
		}
	}
}
  • jms.zip (5.1 KB)
  • 下载次数: 139
分享到:
评论
1 楼 陈碧滔 2013-08-12  
Apache ActiveMQ

相关推荐

    JMS 简单demo

    JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的...

    apache-camel-spring-demo

    Apache Camel uses URIs to work directly with any kind of Transport or messaging model such as HTTP, ActiveMQ, JMS, JBI, SCA, MINA or CXF, as well as pluggable Components and Data Format options....

    springcore-demo

    Java对象ORM : 对象关系映射OXM : 对象XML映射JMS : Java消息服务JDBC : Java数据库连接Transactions : 事务管理(数据库)Spring的设计哲学:在每个层次都提供选择容纳不同的观点保持强烈的向后兼容性关心API设计为...

    Java消息服务

    Java消息服务JMS是对消息支持服务传送的标准API的完全解读.

    fourinone-3.04.25

    但是Fourinone不实现JMS的规范,不提供JMS的消息确认和消息过滤等特殊功能,不过开发者可以基于Fourinone自己去扩充这些功能,包括mq集群,利用一个独立的domain/node建立队列或者主题的key隐射,再仿照上面分布式...

    单点登录源码

    ├── zheng-demo-rpc-api -- rpc接口包 ├── zheng-demo-rpc-service -- rpc服务提供者 └── zheng-demo-web -- 演示示例[端口:8888] ``` ### 技术选型 #### 后端技术: 技术 | 名称 | 官网 ----|------|-...

    Fourinone分布式并行计算四合一框架

    但是Fourinone不实现JMS的规范,不提供JMS的消息确认和消息过滤等特殊功能,不过开发者可以基于Fourinone自己去扩充这些功能,包括mq集群,利用一个独立的domain/node建立队列或者主题的key隐射,再仿照上面分布式...

    spring boot 全面的样例代码

    - chapter5-1-1:[JMS(未完成)] - chapter5-2-1:[Spring Boot中使用RabbitMQ](http://blog.didispace.com/spring-boot-rabbitmq/) ### 其他功能 - chapter6-1-1:[使用Spring StateMachine框架实现状态机]...

    java 面试题 总结

    JAVA相关基础知识 1、面向对象的特征有哪些方面 1.抽象: 抽象就是忽略一个主题中与当前目标无关的那些方面,以便更充分地注意与当前目标有关的方面。抽象并不打算了解全部问题,而只是选择其中的一部分,暂时不用...

    超级有影响力霸气的Java面试题大全文档

    超级有影响力的Java面试题大全文档 1.抽象: 抽象就是忽略一个主题中与当前目标无关的那些方面,以便更充分地注意与当前目标有关的方面。抽象并不打算了解全部问题,而只是选择其中的一部分,暂时不用部分细节。...

Global site tag (gtag.js) - Google Analytics