Search

'ActiveMQ'에 해당되는 글 2건

  1. 2014.09.25 Camel Header 설정
  2. 2012.01.29 Tomcat + Spring + ActiveMQ (1)

Camel Header 설정

Programming/Java 2014.09.25 10:19 Posted by 파란크리스마스

참고 : http://camel.apache.org/jms.html

RouteBuilder

	from("jms:queue:test1?maxConcurrentConsumers=1&maxMessagesPerTask=1&autoStartup=true&preserveMessageQos=true")
	.bean(com.bluexmas.Test1Queue.class, "doTest")
	.to("jms:queue:test2");
	
	from("jms:queue:test2?maxConcurrentConsumers=1&maxMessagesPerTask=1&autoStartup=true&preserveMessageQos=true")
	.bean(com.bluexmas.Test2Queue.class, "doTest")
	.to("stream:out");

Header1Object

package com.bluexmas;

import java.io.Serializable;

public class Header1Object implements Serializable {
	
	private static final long serialVersionUID = 1L;
	
	int age;
	String name;
	
	public Header1Object(int age, String name) {
		super();
		this.age = age;
		this.name = name;
	}

	@Override
	public String toString() {
		return "Header1Object [age=" + age + ", name=" + name + "]";
	}
}

Test1Queue

package com.bluexmas;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.camel.Exchange;
import org.apache.camel.Message;

import com.uni.fep.bean.service.MQSenderService;
import com.uni.fep.bean.util.BeanUtils;
import com.uni.fep.bean.util.EnvironmentInit;

public class Test1Queue {
	
	public void doTest(Exchange exchange) {
		System.out.println("-- Test1Queue.doTest ----------------------------");
		
		Message in = exchange.getIn();
		
		System.out.println("body = " + in.getBody());
		System.out.println("header1 = " + in.getHeader("header1"));
		System.out.println("header2 = " + in.getHeader("header2"));
		
		in.setHeader("header1", "value1-1");
		in.setHeader("header2", "value2");
		
		List<String> listValue = new ArrayList<String>();
		listValue.add("list1");
		listValue.add("list2");
		in.setHeader("header3", listValue);
		System.out.println("header3 = " + in.getHeader("header3"));
		
		in.setHeader("header4", new Header1Object(10, "bluexmas"));
		System.out.println("header4 = " + in.getHeader("header4"));
	}
	
	public static void main(String[] args) throws Exception {
		EnvironmentInit.init();
		
		MQSenderService service = (MQSenderService)BeanUtils.getBean("MQSenderService");
		
		String message = "Hello Camel";
		
		Map<String, Object>map=new HashMap<String,Object>();
		map.put("header1", "value1");
		
		service.doSend("test1", map, message);
		
		Thread.sleep(5000);
		
		System.exit(0);
	}

}

Test2Queue

package com.bluexmas;

import org.apache.camel.Exchange;
import org.apache.camel.Message;

public class Test2Queue {

	public void doTest(Exchange exchange) {
		System.out.println("-- Test2Queue.doTest ----------------------------");
		
		Message in = exchange.getIn();
		
		System.out.println("body = " + in.getBody());
		System.out.println("header1 = " + in.getHeader("header1"));
		System.out.println("header2 = " + in.getHeader("header2"));
		System.out.println("header3 = " + in.getHeader("header3"));
		System.out.println("header4 = " + in.getHeader("header4"));
	}
	
}

출력결과

-- Test1Queue.doTest ----------------------------
body = Hello Camel
header1 = value1
header2 = null
header3 = [list1, list2]
header4 = Header1Object [age=10, name=bluexmas]
-- Test2Queue.doTest ----------------------------
body = Hello Camel
header1 = value1-1
header2 = value2
header3 = null
header4 = null
Hello Camel

설명

Camel에서 기본적으로 Header의 내용은 Queue 넘어 갈때 초기화 되는데, 
초기화 하지 않고, 그대로 데이터를 가지고 다니려면 preserveMessageQos=true 해주면 된다.

출력결과를 보시면 알 수 있듯이 기본 자료형외에 객체는 전달 할 수가 없는 것으로 판단된다.

header1 : 초기에 전달된 Header의 값으로 그 값을 변경했을때 변경된 값이 전달되는 것을 확인
header2 : 초기에 전달되지 않았으나 추가로 Header에 추가된 값으로 Queue 이동 후 값이 존재하는 것을 확인
Header3 : ArreyList에 기본형 데이터(String) 추가해서 Header에 추가해서 전달했으나 List 전달되지 않는 것으로 확인
Header4 : 사용자가 임으로 만들 객체를 Header에 추가 했으나 Queue 이동 후 값이 전달되지 않는 것으로 확인

신고

Tomcat + Spring + ActiveMQ

Programming/Java 2012.01.29 03:56 Posted by 파란크리스마스
출처 : http://dbin318.tistory.com/13

웹페이지 호출 후 오래 걸리는 작업을 실행하는 경우 응답을 바로 할 수 없어서
MQ 시스템을 적용하게 되었습니다.

MQ(Message Queue) 시스템은
Message를 Queue에 전달하고, MQ시스템은 다시 Queue에서 Message을 받아서
Message을 실행하는 것으로, 시스템에서 처리 가능한 용량 만큼만 Queue에 Message을 받아서
처리 하게 됩니다.

부하가 많이 걸리는 SMS 시스템과 같은 시스템에 적용하게 됩니다.

저의 경우는 부하가 많이 걸리는 작업은 아니지만
특정 작업이 1시간 이상 걸리는 작업을 웹페이지에서 호출하는데,
호출후 바로 응답을 받을 수 없는 문제로 MQ 시스템을 적용하게 되었습니다.
(Message을 Queue에 전달하고 바로 응답 페이지를 보여줄 수 있기때문에...비동기식 호출이 가능)

자세한 설명은 출처 페이지에서 보실수 있습니다.

적용환경

Tomcat 7
Spring 3.0.4
ActiveMQ 5.4.3

test-activemq.xml
<?xml version="1.0" encoding="UTF-8"?>

<beans 
  xmlns="http://www.springframework.org/schema/beans" 
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

	<!-- a pooling based JMS provider -->
	<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL">
			<value>vm://localhost</value>
		</property>
	</bean>

	<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
		destroy-method="stop">
		<property name="connectionFactory" ref="connectionFactory" />
	</bean>

	<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg value="dbin.queue" />
	</bean>

	<bean id="msgConverter" class="test.dbin.MsgConverterImpl" />
	<bean id="msgMdp" class="test.dbin.MsgMDP" />

	<bean id="purePojoMdp"
		class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
		<property name="delegate" ref="msgMdp" />
		<property name="defaultListenerMethod" value="handleMessage" />
		<property name="messageConverter" ref="msgConverter" />
	</bean>
	
	<bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
		<property name="connectionFactory" ref="jmsFactory" />
		<property name="destination" ref="queueDestination" />
		<property name="messageListener" ref="purePojoMdp" />
	</bean>

	<!-- Spring JMS Template -->
	<bean id="producerJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="jmsFactory" />
		<property name="defaultDestination" ref="queueDestination" />
		<property name="messageConverter" ref="msgConverter" />
	</bean>

	<bean id="producer" class="test.dbin.MsgProducerImpl">
		<property name="jmsTemplate" ref="producerJmsTemplate" />
	</bean>
</beans>

Msg.java
package test.dbin;

public class Msg {

	private String name;
	private String value;

	public void setName(String name) {
		this.name = name;
	}

	public String getName() {
		return name;
	}

	public void setValue(String value) {
		this.value = value;
	}

	public String getValue() {
		return value;
	}

	public String toString() {
		return "name = " + name + ", value = " + value;
	}

}

MsgProducer.java
package test.dbin;

import org.springframework.jms.core.JmsTemplate;

public interface MsgProducer {
 
	public void send(final Msg map);
 
	public void setJmsTemplate(JmsTemplate jmsTemplate);
 
}

MsgProducerImpl.java
package test.dbin;

import org.springframework.jms.core.JmsTemplate;

public class MsgProducerImpl implements MsgProducer {

	private JmsTemplate jmsTemplate;
 
	public void setJmsTemplate(JmsTemplate jmsTemplate) {
 		this.jmsTemplate = jmsTemplate;
 	}
	
	public void send(final Msg msg) {		
 		jmsTemplate.convertAndSend(msg);
 	}	
 
}

MsgMDP.java
package test.dbin;

/**
 * Message Driven POJO
 *
 */
public class MsgMDP {
 
	public void handleMessage(Msg msg) {
		// handles the message
		System.out.println("handler called");
		System.out.println(msg);
	}
 
}

MsgConverterImpl.java
package test.dbin;
 
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;

/**
 * MapMessage로의 conversion을 담당하는 class
 * 
 *
 */
public class MsgConverterImpl implements MessageConverter {

	public MsgConverterImpl() {

	}

	public Object fromMessage(Message message) throws JMSException, MessageConversionException {
		if( !(message instanceof MapMessage)) {
			throw new MessageConversionException("not MapMessage");
		}
		
		System.out.println("MsgConverterImpl.fromMessage");
		
		MapMessage mapMessage = (MapMessage)message;
		Msg msg = new Msg();
		msg.setName(mapMessage.getString("name"));
		msg.setValue(mapMessage.getString("value"));

		return msg;
	}

	public Message toMessage(Object object, Session session)  throws JMSException, MessageConversionException {
		if ( !(object instanceof Msg) ) {
			throw new MessageConversionException("not Msg");
		}
		
		System.out.println("MsgConverterImpl.toMessage");
		
		Msg msg = (Msg)object;
		MapMessage mapMessage = session.createMapMessage();

		mapMessage.setString("name", msg.getName());
		mapMessage.setString("value", msg.getValue());
		
		return mapMessage;
	}
}

MQController.java
  @Override
	public ModelAndView handleRequest(HttpServletRequest request, HttpServletResponse response) throws Exception {
		
		Msg msg = new Msg();
		msg.setName("daniel yoon");
		msg.setValue("lullaby");
 		
		// 전송 
		MsgProducer producer = (MsgProducer)BeanUtils.getBean("producer");
		producer.send(msg);
		System.out.println(msg + " sent");
		
		
		ModelAndView mv = new ModelAndView();
		mv.setViewName("/index");

		return mv;
	}
실행결과

MsgConverterImpl.toMessage
MsgConverterImpl.fromMessage
handler called
name = daniel yoon, value = lullaby
name = daniel yoon, value = lullaby sent
신고


 

티스토리 툴바