- 浏览: 279801 次
- 性别:
- 来自: 湖南
文章分类
- 全部博客 (120)
- Struts 1.框架 (12)
- Spring框架 (9)
- hibernate框架 (6)
- web 综合 (15)
- Dwr (1)
- struts 2 (19)
- 设计模式 (0)
- lucene (6)
- oracle (3)
- linux (10)
- mysql (4)
- jquery (0)
- AJAX (1)
- javaScript (1)
- J2SE (4)
- IBATIS (3)
- JPA (1)
- Compass (3)
- 报表 (1)
- 任务调度 (1)
- tomcat (3)
- RMI (0)
- JMS (2)
- WebService (2)
- SOAP (0)
- XML (1)
- 多线程 (8)
- 缓存 (2)
- nginx (3)
- mongodb (1)
- ant打包 (0)
最新评论
-
iceman1952:
Hi 你觉得很好的那两三篇 百度文库的链接,能贴一下吗?
内网穿透&UDP打洞 -
ice86rain:
里面貌似没有用到Lucene
Struts2+Hibernate3.2+Spring2.5+Compass整合 -
sgq0085:
写得非常详细 好文章
JMS之ActiveMQ Linux下安装与应用实例 -
強顔歓笶:
JMS之ActiveMQ Linux下安装与应用实例 -
yixiandave:
forcer521 写道不指定所有子目录都在一起的话,这样用源 ...
linux下nginx稳定版1.6.2安装
1.下载activeMQ安装包,拷贝到/activeMQ目录下
apache-activemq-5.10.0-bin.tar.gz,下载地址http://activemq.apache.org/download.html
2.解压文件到运行目录
3.为了方便管理,重命名
4.启动服务
5.查看是否启动成功
6.停止服务
到此环境准备成功
demo应用
----------------------------------------------------------
点对点:
生产者:
消费者:
运行结果:
===========================================================================
发布与订阅:
-----------------------
消息发布者
消息订阅者:
apache-activemq-5.10.0-bin.tar.gz,下载地址http://activemq.apache.org/download.html
2.解压文件到运行目录
[root@iZ94wmbxqzyZ softs]# tar -xzvf /server/apache-activemq-5.10.0-bin.tar.gz
3.为了方便管理,重命名
[root@iZ94wmbxqzyZ softs]# mv apache-activemq-5.10.0 activemq-5.10.0 [root@iZ94wmbxqzyZ softs]# cd activemq-5.10.0/ [root@iZ94wmbxqzyZ activemq-5.10.0]# ll total 6304 -rwxr-xr-x 1 root root 6371237 Jun 5 2014 activemq-all-5.10.0.jar drwxr-xr-x 5 root root 4096 Jan 11 23:31 bin drwxr-xr-x 2 root root 4096 Jan 11 23:31 conf drwxr-xr-x 2 root root 4096 Jan 11 23:31 data drwxr-xr-x 2 root root 4096 Jan 11 23:31 docs drwxr-xr-x 8 root root 4096 Jan 11 23:31 examples drwxr-xr-x 6 root root 4096 Jan 11 23:31 lib -rw-r--r-- 1 root root 40580 Jun 5 2014 LICENSE -rw-r--r-- 1 root root 3334 Jun 5 2014 NOTICE -rw-r--r-- 1 root root 2610 Jun 5 2014 README.txt drwxr-xr-x 7 root root 4096 Jan 11 23:31 webapps drwxr-xr-x 3 root root 4096 Jan 11 23:31 webapps-demo [root@iZ94wmbxqzyZ activemq-5.10.0]# cd bin/ [root@iZ94wmbxqzyZ bin]# ll total 152 -rwxr-xr-x 1 root root 22126 Jun 5 2014 activemq -rwxr-xr-x 1 root root 5665 Jun 5 2014 activemq-admin -rw-r--r-- 1 root root 15954 Jun 5 2014 activemq.jar -rwxr-xr-x 1 root root 6189 Jun 5 2014 diag drwxr-xr-x 2 root root 4096 Jan 11 23:31 linux-x86-32 drwxr-xr-x 2 root root 4096 Jan 11 23:31 linux-x86-64 drwxr-xr-x 2 root root 4096 Jan 11 23:31 macosx -rwxr-xr-x 1 root root 83820 Jun 5 2014 wrapper.jar
4.启动服务
[root@iZ94wmbxqzyZ bin]# ./activemq start INFO: Using default configuration (you can configure options in one of these file: /etc/default/activemq /root/.activemqrc) INFO: Invoke the following command to create a configuration file ./activemq setup [ /etc/default/activemq | /root/.activemqrc ] INFO: Using java '/softs/jdk1.6.0_30/bin/java' INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details INFO: pidfile created : '/softs/activemq-5.10.0/data/activemq-iZ94wmbxqzyZ.pid' (pid '28962')
5.查看是否启动成功
[root@iZ94wmbxqzyZ bin]# [root@iZ94wmbxqzyZ bin]# ps -ef | grep activemq root 28962 1 32 23:32 pts/0 00:00:04 /softs/jdk1.6.0_30/bin/java -Xms1G -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/softs/activemq-5.10.0/conf/login.config -Dcom.sun.management.jmxremote -Djava.awt.headless=true -Djava.io.tmpdir=/softs/activemq-5.10.0/tmp -Dactivemq.classpath=/softs/activemq-5.10.0/conf; -Dactivemq.home=/softs/activemq-5.10.0 -Dactivemq.base=/softs/activemq-5.10.0 -Dactivemq.conf=/softs/activemq-5.10.0/conf -Dactivemq.data=/softs/activemq-5.10.0/data -jar /softs/activemq-5.10.0/bin/activemq.jar start root 29011 28898 0 23:32 pts/0 00:00:00 grep activemq [root@iZ94wmbxqzyZ bin]# [root@iZ94wmbxqzyZ bin]#
6.停止服务
[root@iZ94wmbxqzyZ data]# [root@iZ94wmbxqzyZ data]# kill 28962 [root@iZ94wmbxqzyZ data]# [root@iZ94wmbxqzyZ data]# ps -ef | grep activemq root 29078 28898 0 23:42 pts/0 00:00:00 grep activemq [root@iZ94wmbxqzyZ data]#
到此环境准备成功
demo应用
package com.wzh.activemq; import java.io.Serializable; public class User implements Serializable{ private static final long serialVersionUID = 1L; private String username ; private String password ; public User(String username,String password){ this.username = username ; this.password = password ; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } @Override public String toString() { // TODO Auto-generated method stub return "[username="+username+",password="+password+"]" ; } }
----------------------------------------------------------
点对点:
生产者:
package com.wzh.activemq; import java.io.Serializable; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class P2PMessageProducer { protected String username = ActiveMQConnection.DEFAULT_USER; protected String password = ActiveMQConnection.DEFAULT_PASSWORD; //protected String brokerURL = "tcp://127.0.0.1:61616"; protected String brokerURL = "tcp://120.24.85.167:61616"; protected static transient ConnectionFactory factory; protected transient Connection connection; public static void main(String[] args) { try { new P2PMessageProducer().sendObjectMessage(new User("wzh","q123456")); new P2PMessageProducer().sendMapMessage(); new P2PMessageProducer().sendTextMessage("海,你好"); } catch (Exception e) { e.printStackTrace(); } } public P2PMessageProducer() { try { factory = new ActiveMQConnectionFactory(username, password, brokerURL); connection = factory.createConnection(); connection.start(); } catch (JMSException jmse) { close(); } } /** * 初始化连接信息 */ public P2PMessageProducer(String username, String password, String brokerURL) throws JMSException { this.username = username; this.password = password; this.brokerURL = brokerURL; factory = new ActiveMQConnectionFactory(username, password, brokerURL); connection = factory.createConnection(); try { connection.start(); } catch (JMSException jmse) { connection.close(); throw jmse; } } /** * 关闭连接 */ public void close() { try { if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } protected void sendObjectMessage(Serializable serializable) throws JMSException { Session session = null; try { session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("MessageQueue"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); Message message = session.createObjectMessage(serializable); producer.send(message); session.commit(); } catch (JMSException e) { try { session.rollback() ; } catch (JMSException e1) { e1.printStackTrace(); } throw e ; } finally { close(); } } protected void sendTextMessage(String text) throws JMSException { Session session = null; try { session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("MessageQueue"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); Message message = session.createTextMessage(text); producer.send(message); session.commit(); } catch (JMSException e) { try { session.rollback() ; } catch (JMSException e1) { e1.printStackTrace(); } throw e ; } finally { close(); } } protected void sendMapMessage() throws JMSException { Session session = null; try { session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("MessageQueue"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); MapMessage message = session.createMapMessage(); message.setString("stock", "string"); message.setDouble("price", 11.14); producer.send(message); session.commit(); } catch (JMSException e) { try { session.rollback() ; } catch (JMSException e1) { e1.printStackTrace(); } throw e ; } finally { close(); } } }
消费者:
package com.wzh.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class P2PMessageConsumer { protected String username = ActiveMQConnection.DEFAULT_USER; protected String password = ActiveMQConnection.DEFAULT_PASSWORD; //protected String brokerURL = "tcp://127.0.0.1:61616"; protected String brokerURL = "tcp://120.24.85.167:61616"; protected static transient ConnectionFactory factory; protected transient Connection connection; public static void main(String[] args) { P2PMessageConsumer consumer = new P2PMessageConsumer(); consumer.receiveMessage(); } public P2PMessageConsumer() { try { factory = new ActiveMQConnectionFactory(username, password, brokerURL); connection = factory.createConnection(); connection.start(); } catch (JMSException jmse) { close(); } } public P2PMessageConsumer(String username, String password, String brokerURL) throws JMSException { this.username = username; this.password = password; this.brokerURL = brokerURL; factory = new ActiveMQConnectionFactory(username, password, brokerURL); connection = factory.createConnection(); try { connection.start(); } catch (JMSException jmse) { connection.close(); throw jmse; } } public void close() { try { if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } protected void receiveMessage() { Session session = null; try { session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("MessageQueue"); MessageConsumer consumer = session.createConsumer(destination); while (true) { Message message = consumer.receive(); if (null != message) { if (message instanceof ObjectMessage) { System.out.println("deal ObjectMessage...."); dealObjectMessage((ObjectMessage) message); } else if (message instanceof MapMessage) { System.out.println("deal MapMessage...."); dealMapMessage((MapMessage) message); } else if (message instanceof TextMessage) { System.out.println("deal TextMessage...."); dealTextMessage((TextMessage) message); } } else { break; } } } catch (Exception e) { e.printStackTrace(); } finally { if (session != null) { try { session.commit(); } catch (JMSException e) { e.printStackTrace(); } } } } /** * * 处理 TextMessage消息 * * @throws JMSException */ private void dealTextMessage(TextMessage message) throws JMSException { String text = message.getText(); System.out.println("text = " + text); } /** * * 处理 MapMessage消息 * * @throws JMSException */ private void dealMapMessage(MapMessage message) throws JMSException { String stack = message.getString("stock"); Double price = message.getDouble("price"); System.out.println("stock = " + stack + " , price =" + price); } /** * 处理ObjectMessage消息 */ private void dealObjectMessage(ObjectMessage message) throws JMSException { User user = (User) message.getObject(); System.out.println(user.toString()); } }
运行结果:
deal ObjectMessage.... [username=wzh,password=q123456] deal MapMessage.... stock = string , price =11.14 deal TextMessage.... text = 海,你好
===========================================================================
发布与订阅:
-----------------------
消息发布者
package com.wzh.activemq; import java.io.Serializable; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Publish { protected String username = ActiveMQConnection.DEFAULT_USER; protected String password = ActiveMQConnection.DEFAULT_PASSWORD; //protected String brokerURL = "tcp://127.0.0.1:61616"; protected String brokerURL = "tcp://120.24.85.167:61616"; protected static transient ConnectionFactory factory; protected transient Connection connection; public static void main(String[] args) { try { new Publish().sendObjectMessage(new User("wzh","q123456")); new Publish().sendMapMessage(); new Publish().sendTextMessage("海,你好"); } catch (Exception e) { e.printStackTrace(); } } public Publish() { try { factory = new ActiveMQConnectionFactory(username, password, brokerURL); connection = factory.createConnection(); connection.start(); } catch (JMSException jmse) { close(); } } public Publish(String username, String password, String brokerURL) throws JMSException { this.username = username; this.password = password; this.brokerURL = brokerURL; factory = new ActiveMQConnectionFactory(username, password, brokerURL); connection = factory.createConnection(); try { connection.start(); } catch (JMSException jmse) { connection.close(); throw jmse; } } public void close() { try { if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } protected void sendObjectMessage(Serializable serializable) throws JMSException { Session session = null; try { session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("MessageTopic"); MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); Message message = session.createObjectMessage(serializable); producer.send(message); session.commit(); } catch (JMSException e) { try { session.rollback() ; } catch (JMSException e1) { e1.printStackTrace(); } throw e ; } finally { close(); } } protected void sendTextMessage(String text) throws JMSException { Session session = null; try { session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("MessageTopic"); MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); Message message = session.createTextMessage(text); producer.send(message); session.commit(); } catch (JMSException e) { try { session.rollback() ; } catch (JMSException e1) { e1.printStackTrace(); } throw e ; } finally { close(); } } protected void sendMapMessage() throws JMSException { Session session = null; try { session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("MessageTopic"); MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); MapMessage message = session.createMapMessage(); message.setString("stock", "string"); message.setDouble("price", 11.14); producer.send(message); session.commit(); } catch (JMSException e) { try { session.rollback() ; } catch (JMSException e1) { e1.printStackTrace(); } throw e ; } finally { close(); } } }
消息订阅者:
package com.wzh.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Subscriber { protected String username = ActiveMQConnection.DEFAULT_USER; protected String password = ActiveMQConnection.DEFAULT_PASSWORD; //protected String brokerURL = "tcp://127.0.0.1:61616"; protected String brokerURL = "tcp://120.24.85.167:61616"; protected static transient ConnectionFactory factory; protected transient Connection connection; public static void main(String[] args) { Subscriber consumer = new Subscriber(); consumer.receiveMessage(); } public Subscriber() { try { factory = new ActiveMQConnectionFactory(username, password, brokerURL); connection = factory.createConnection(); connection.start(); } catch (JMSException jmse) { close(); } } public Subscriber(String username, String password, String brokerURL) throws JMSException { this.username = username; this.password = password; this.brokerURL = brokerURL; factory = new ActiveMQConnectionFactory(username, password, brokerURL); connection = factory.createConnection(); try { connection.start(); } catch (JMSException jmse) { connection.close(); throw jmse; } } public void close() { try { if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } protected void receiveMessage() { Session session = null; try { session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("MessageTopic"); MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if (message instanceof ObjectMessage) { System.out.println("deal ObjectMessage...."); dealObjectMessage((ObjectMessage) message); } else if (message instanceof MapMessage) { System.out.println("deal MapMessage...."); dealMapMessage((MapMessage) message); } else if (message instanceof TextMessage) { System.out.println("deal TextMessage...."); dealTextMessage((TextMessage) message); } } }) ; } catch (Exception e) { e.printStackTrace(); } finally { /*if (session != null) { try { session.commit(); } catch (JMSException e) { e.printStackTrace(); } }*/ } } /** * * 处理 TextMessage消息 * * @throws JMSException */ private void dealTextMessage(TextMessage message) { try { String text = message.getText(); System.out.println("text = " + text); } catch (JMSException e) { e.printStackTrace(); } } /** * * 处理 MapMessage消息 * * @throws JMSException */ private void dealMapMessage(MapMessage message){ try { String stack = message.getString("stock"); Double price = message.getDouble("price"); System.out.println("stock = " + stack + " , price =" + price); } catch (JMSException e) { e.printStackTrace(); } } /** * 处理ObjectMessage消息 */ private void dealObjectMessage(ObjectMessage message){ try { User user = (User) message.getObject(); System.out.println(user.toString()); } catch (JMSException e) { e.printStackTrace(); } } }
相关推荐
Java二进制IO类与文件复制操作实例,好像是一本书的例子,源代码有的是独立运行的,与同目录下的其它代码文件互不联系,这些代码面向初级、中级Java程序员。 Java访问权限控制源代码 1个目标文件 摘要:Java源码,...
Java二进制IO类与文件复制操作实例,好像是一本书的例子,源代码有的是独立运行的,与同目录下的其它代码文件互不联系,这些代码面向初级、中级Java程序员。 Java访问权限控制源代码 1个目标文件 摘要:Java源码,...
开发它是用于在UTF-8 Oracle实例中使用ASCII编码的Oracle 数据库中来正确的传输非ASCII字符。 Java模板语言 Beetl Beetl,是Bee Template Language的缩写,它绝不是简单的另外一种模板引擎,而是新一代的模板引擎,...
开发它是用于在UTF-8 Oracle实例中使用ASCII编码的Oracle 数据库中来正确的传输非ASCII字符。 Java模板语言 Beetl Beetl,是Bee Template Language的缩写,它绝不是简单的另外一种模板引擎,而是新一代的模板引擎,...
开发它是用于在UTF-8 Oracle实例中使用ASCII编码的Oracle 数据库中来正确的传输非ASCII字符。 Java模板语言 Beetl Beetl,是Bee Template Language的缩写,它绝不是简单的另外一种模板引擎,而是新一代的模板引擎,...
开发它是用于在UTF-8 Oracle实例中使用ASCII编码的Oracle 数据库中来正确的传输非ASCII字符。 Java模板语言 Beetl Beetl,是Bee Template Language的缩写,它绝不是简单的另外一种模板引擎,而是新一代的模板引擎,...
开发它是用于在UTF-8 Oracle实例中使用ASCII编码的Oracle 数据库中来正确的传输非ASCII字符。 Java模板语言 Beetl Beetl,是Bee Template Language的缩写,它绝不是简单的另外一种模板引擎,而是新一代的模板引擎,...
开发它是用于在UTF-8 Oracle实例中使用ASCII编码的Oracle 数据库中来正确的传输非ASCII字符。 Java模板语言 Beetl Beetl,是Bee Template Language的缩写,它绝不是简单的另外一种模板引擎,而是新一代的模板引擎,...
开发它是用于在UTF-8 Oracle实例中使用ASCII编码的Oracle 数据库中来正确的传输非ASCII字符。 Java模板语言 Beetl Beetl,是Bee Template Language的缩写,它绝不是简单的另外一种模板引擎,而是新一代的模板引擎,...
开发它是用于在UTF-8 Oracle实例中使用ASCII编码的Oracle 数据库中来正确的传输非ASCII字符。 Java模板语言 Beetl Beetl,是Bee Template Language的缩写,它绝不是简单的另外一种模板引擎,而是新一代的模板引擎,...
开发它是用于在UTF-8 Oracle实例中使用ASCII编码的Oracle 数据库中来正确的传输非ASCII字符。 Java模板语言 Beetl Beetl,是Bee Template Language的缩写,它绝不是简单的另外一种模板引擎,而是新一代的模板引擎,...
开发它是用于在UTF-8 Oracle实例中使用ASCII编码的Oracle 数据库中来正确的传输非ASCII字符。 Java模板语言 Beetl Beetl,是Bee Template Language的缩写,它绝不是简单的另外一种模板引擎,而是新一代的模板引擎,...
开发它是用于在UTF-8 Oracle实例中使用ASCII编码的Oracle 数据库中来正确的传输非ASCII字符。 Java模板语言 Beetl Beetl,是Bee Template Language的缩写,它绝不是简单的另外一种模板引擎,而是新一代的模板引擎,...
开发它是用于在UTF-8 Oracle实例中使用ASCII编码的Oracle 数据库中来正确的传输非ASCII字符。 Java模板语言 Beetl Beetl,是Bee Template Language的缩写,它绝不是简单的另外一种模板引擎,而是新一代的模板引擎,...
开发它是用于在UTF-8 Oracle实例中使用ASCII编码的Oracle 数据库中来正确的传输非ASCII字符。 Java模板语言 Beetl Beetl,是Bee Template Language的缩写,它绝不是简单的另外一种模板引擎,而是新一代的模板引擎,...