- Structure
- build.gradle
buildscript { repositories { mavenCentral() } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:1.4.0.RELEASE") } } apply plugin: 'java' apply plugin: 'spring-boot' repositories { mavenCentral() } dependencies { compile('org.springframework.boot:spring-boot-starter-web') compile group: 'org.projectlombok', name: 'lombok', version: '1.16.10' compile group: 'com.rabbitmq', name: 'amqp-client', version: '3.6.5' }
- application.yml
spring: application: name: queue-server server: port: 8080
- SendMessageService.java
package org.blog.test.queue.service; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import lombok.extern.slf4j.Slf4j; import org.blog.test.queue.model.TopicType; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.io.IOException; import java.util.concurrent.Executors; import java.util.concurrent.TimeoutException; @Component @Slf4j public class SendMessageService { @Value("${message.queue.host:localhost}") private String messageQueueHost; @Value("${message.queue.port:5672}") private Integer messageQueuePort; @Value("${message.queue.connection.size:30}") private Integer connectionSize; private static ConnectionFactory connectionFactory; private static final String TOPIC_TYPE = "topic_message"; @PostConstruct private void init() throws IOException, TimeoutException { configureConnection(); configureQueue(); } private void configureConnection() { connectionFactory = new ConnectionFactory(); connectionFactory.setHost(messageQueueHost); connectionFactory.setPort(messageQueuePort); connectionFactory.setConnectionTimeout(20); } private void configureQueue() throws IOException, TimeoutException { Connection connection = connectionFactory.newConnection(Executors.newFixedThreadPool(connectionSize)); Channel channel = connection.createChannel(); channel.exchangeDeclare(TOPIC_TYPE, "topic"); for (TopicType topicType : TopicType.values()) { channel.queueDeclare(topicType.getQueueType(), true, false, false, null); channel.queueBind(topicType.getQueueType(), TOPIC_TYPE, topicType.name()); } channel.close(); connection.close(); } public void send(TopicType topicType, String message) throws IOException, TimeoutException { Connection connection = connectionFactory.newConnection(Executors.newFixedThreadPool(connectionSize)); Channel channel = connection.createChannel(); channel.basicPublish(TOPIC_TYPE, topicType.name(), null, message.getBytes()); channel.close(); connection.close(); log.info("send topicType : [{}],message: [{}]", topicType.name(), message); } }
- QueueMessageReciever.java
package org.blog.test.queue.service; import com.rabbitmq.client.*; import lombok.extern.slf4j.Slf4j; import org.blog.test.queue.model.TopicType; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.io.IOException; import java.util.concurrent.Executors; import java.util.concurrent.TimeoutException; @Component @Slf4j public class QueueMessageReciever { @Value("${message.queue.host:localhost}") private String messageQueueHost; @Value("${message.queue.port:5672}") private Integer messageQueuePort; @Value("${message.queue.connection.size:30}") private Integer connectionSize; private static ConnectionFactory connectionFactory; @PostConstruct private void init() throws IOException, TimeoutException { connectionFactory = new ConnectionFactory(); connectionFactory.setHost(messageQueueHost); connectionFactory.setPort(messageQueuePort); connectionFactory.setConnectionTimeout(20); for (TopicType topicType : TopicType.values()) { consume(topicType); } } private void consume(TopicType topicType) throws IOException, TimeoutException { Connection connection = connectionFactory.newConnection(Executors.newFixedThreadPool(connectionSize)); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); log.info("Received envelop : [{}], message : [{}]", envelope.getRoutingKey(), message); } }; channel.basicConsume(topicType.getQueueType(), true, consumer); } }
- QueueApplication.java
package org.blog.test; import org.blog.test.queue.model.TopicType; import org.blog.test.queue.service.SendMessageService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import java.io.IOException; import java.util.concurrent.TimeoutException; @SpringBootApplication @RestController public class QueueApplication { @Autowired private SendMessageService sendMessageService; public static void main(String[] args) { SpringApplication.run(QueueApplication.class, args); } @RequestMapping("/send/{topic}/{message}") public void sendMessage(@PathVariable("topic") String topic, @PathVariable("message") String message) throws IOException, TimeoutException { sendMessageService.send(TopicType.valueOf(topic.toUpperCase()), message); } }
- TopicType.java
package org.blog.test.queue.model; import lombok.AllArgsConstructor; import lombok.Getter; @AllArgsConstructor @Getter public enum TopicType { PUSH("PUSH_QUEUE"), EMAIL("EMAIL_QUEUE"), LOG("LOG_QUEUE"); private String queueType; }
- Result
You can find the subscribed log, and graph in rabbitmq console.
댓글 없음 :
댓글 쓰기