- Structure
- build.gradle
buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:1.4.0.RELEASE")
}
}
apply plugin: 'java'
apply plugin: 'spring-boot'
repositories {
mavenCentral()
}
dependencies {
compile('org.springframework.boot:spring-boot-starter-web')
compile group: 'org.projectlombok', name: 'lombok', version: '1.16.10'
compile group: 'com.rabbitmq', name: 'amqp-client', version: '3.6.5'
}
- application.yml
spring:
application:
name: queue-server
server:
port: 8080
- SendMessageService.java
package org.blog.test.queue.service;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import org.blog.test.queue.model.TopicType;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
@Component
@Slf4j
public class SendMessageService {
@Value("${message.queue.host:localhost}")
private String messageQueueHost;
@Value("${message.queue.port:5672}")
private Integer messageQueuePort;
@Value("${message.queue.connection.size:30}")
private Integer connectionSize;
private static ConnectionFactory connectionFactory;
private static final String TOPIC_TYPE = "topic_message";
@PostConstruct
private void init() throws IOException, TimeoutException {
configureConnection();
configureQueue();
}
private void configureConnection() {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost(messageQueueHost);
connectionFactory.setPort(messageQueuePort);
connectionFactory.setConnectionTimeout(20);
}
private void configureQueue() throws IOException, TimeoutException {
Connection connection = connectionFactory.newConnection(Executors.newFixedThreadPool(connectionSize));
Channel channel = connection.createChannel();
channel.exchangeDeclare(TOPIC_TYPE, "topic");
for (TopicType topicType : TopicType.values()) {
channel.queueDeclare(topicType.getQueueType(), true, false, false, null);
channel.queueBind(topicType.getQueueType(), TOPIC_TYPE, topicType.name());
}
channel.close();
connection.close();
}
public void send(TopicType topicType, String message) throws IOException, TimeoutException {
Connection connection = connectionFactory.newConnection(Executors.newFixedThreadPool(connectionSize));
Channel channel = connection.createChannel();
channel.basicPublish(TOPIC_TYPE, topicType.name(), null, message.getBytes());
channel.close();
connection.close();
log.info("send topicType : [{}],message: [{}]", topicType.name(), message);
}
}
- QueueMessageReciever.java
package org.blog.test.queue.service;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import org.blog.test.queue.model.TopicType;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
@Component
@Slf4j
public class QueueMessageReciever {
@Value("${message.queue.host:localhost}")
private String messageQueueHost;
@Value("${message.queue.port:5672}")
private Integer messageQueuePort;
@Value("${message.queue.connection.size:30}")
private Integer connectionSize;
private static ConnectionFactory connectionFactory;
@PostConstruct
private void init() throws IOException, TimeoutException {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost(messageQueueHost);
connectionFactory.setPort(messageQueuePort);
connectionFactory.setConnectionTimeout(20);
for (TopicType topicType : TopicType.values()) {
consume(topicType);
}
}
private void consume(TopicType topicType) throws IOException, TimeoutException {
Connection connection = connectionFactory.newConnection(Executors.newFixedThreadPool(connectionSize));
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
log.info("Received envelop : [{}], message : [{}]", envelope.getRoutingKey(), message);
}
};
channel.basicConsume(topicType.getQueueType(), true, consumer);
}
}
- QueueApplication.java
package org.blog.test;
import org.blog.test.queue.model.TopicType;
import org.blog.test.queue.service.SendMessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@SpringBootApplication
@RestController
public class QueueApplication {
@Autowired
private SendMessageService sendMessageService;
public static void main(String[] args) {
SpringApplication.run(QueueApplication.class, args);
}
@RequestMapping("/send/{topic}/{message}")
public void sendMessage(@PathVariable("topic") String topic, @PathVariable("message") String message) throws IOException, TimeoutException {
sendMessageService.send(TopicType.valueOf(topic.toUpperCase()), message);
}
}
- TopicType.java
package org.blog.test.queue.model;
import lombok.AllArgsConstructor;
import lombok.Getter;
@AllArgsConstructor
@Getter
public enum TopicType {
PUSH("PUSH_QUEUE"),
EMAIL("EMAIL_QUEUE"),
LOG("LOG_QUEUE");
private String queueType;
}
- Result
You can find the subscribed log, and graph in rabbitmq console.



댓글 없음 :
댓글 쓰기