- Structure
- build.gradle
buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:1.4.0.RELEASE")
    }
}
apply plugin: 'java'
apply plugin: 'spring-boot'
repositories {
    mavenCentral()
}
dependencies {
    compile('org.springframework.boot:spring-boot-starter-web')
    compile group: 'org.projectlombok', name: 'lombok', version: '1.16.10'
    compile group: 'com.rabbitmq', name: 'amqp-client', version: '3.6.5'
}
- application.yml
spring:
  application:
    name: queue-server
server:
  port: 8080
- SendMessageService.java
package org.blog.test.queue.service;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import org.blog.test.queue.model.TopicType;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
@Component
@Slf4j
public class SendMessageService {
    @Value("${message.queue.host:localhost}")
    private String messageQueueHost;
    @Value("${message.queue.port:5672}")
    private Integer messageQueuePort;
    @Value("${message.queue.connection.size:30}")
    private Integer connectionSize;
    private static ConnectionFactory connectionFactory;
    private static final String TOPIC_TYPE = "topic_message";
    @PostConstruct
    private void init() throws IOException, TimeoutException {
        configureConnection();
        configureQueue();
    }
    private void configureConnection() {
        connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(messageQueueHost);
        connectionFactory.setPort(messageQueuePort);
        connectionFactory.setConnectionTimeout(20);
    }
    private void configureQueue() throws IOException, TimeoutException {
        Connection connection = connectionFactory.newConnection(Executors.newFixedThreadPool(connectionSize));
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(TOPIC_TYPE, "topic");
        for (TopicType topicType : TopicType.values()) {
            channel.queueDeclare(topicType.getQueueType(), true, false, false, null);
            channel.queueBind(topicType.getQueueType(), TOPIC_TYPE, topicType.name());
        }
        channel.close();
        connection.close();
    }
    public void send(TopicType topicType, String message) throws IOException, TimeoutException {
        Connection connection = connectionFactory.newConnection(Executors.newFixedThreadPool(connectionSize));
        Channel channel = connection.createChannel();
        channel.basicPublish(TOPIC_TYPE, topicType.name(), null, message.getBytes());
        channel.close();
        connection.close();
        log.info("send topicType : [{}],message: [{}]", topicType.name(), message);
    }
}
- QueueMessageReciever.java
package org.blog.test.queue.service;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import org.blog.test.queue.model.TopicType;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
@Component
@Slf4j
public class QueueMessageReciever {
    @Value("${message.queue.host:localhost}")
    private String messageQueueHost;
    @Value("${message.queue.port:5672}")
    private Integer messageQueuePort;
    @Value("${message.queue.connection.size:30}")
    private Integer connectionSize;
    private static ConnectionFactory connectionFactory;
    @PostConstruct
    private void init() throws IOException, TimeoutException {
        connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(messageQueueHost);
        connectionFactory.setPort(messageQueuePort);
        connectionFactory.setConnectionTimeout(20);
        for (TopicType topicType : TopicType.values()) {
            consume(topicType);
        }
    }
    private void consume(TopicType topicType) throws IOException, TimeoutException {
        Connection connection = connectionFactory.newConnection(Executors.newFixedThreadPool(connectionSize));
        Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                log.info("Received envelop : [{}], message : [{}]", envelope.getRoutingKey(), message);
            }
        };
        channel.basicConsume(topicType.getQueueType(), true, consumer);
    }
}
- QueueApplication.java
package org.blog.test;
import org.blog.test.queue.model.TopicType;
import org.blog.test.queue.service.SendMessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@SpringBootApplication
@RestController
public class QueueApplication {
    @Autowired
    private SendMessageService sendMessageService;
    public static void main(String[] args) {
        SpringApplication.run(QueueApplication.class, args);
    }
    @RequestMapping("/send/{topic}/{message}")
    public void sendMessage(@PathVariable("topic") String topic, @PathVariable("message") String message) throws IOException, TimeoutException {
        sendMessageService.send(TopicType.valueOf(topic.toUpperCase()), message);
    }
}
- TopicType.java
package org.blog.test.queue.model;
import lombok.AllArgsConstructor;
import lombok.Getter;
@AllArgsConstructor
@Getter
public enum TopicType {
    PUSH("PUSH_QUEUE"),
    EMAIL("EMAIL_QUEUE"),
    LOG("LOG_QUEUE");
    private String queueType;
}
- Result
You can find the subscribed log, and graph in rabbitmq console.



댓글 없음 :
댓글 쓰기