2016년 8월 23일 화요일

Making queue service by using RabbitMq (publish/subscribe)

You can make publish/subscribe queue system by using rabbitmq.

- Structure















- build.gradle

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:1.4.0.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'spring-boot'

repositories {
    mavenCentral()
}

dependencies {
    compile('org.springframework.boot:spring-boot-starter-web')
    compile group: 'org.projectlombok', name: 'lombok', version: '1.16.10'
    compile group: 'com.rabbitmq', name: 'amqp-client', version: '3.6.5'
}

- application.yml

spring:
  application:
    name: queue-server
server:
  port: 8080

- SendMessageService.java

package org.blog.test.queue.service;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import org.blog.test.queue.model.TopicType;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;

@Component
@Slf4j
public class SendMessageService {

    @Value("${message.queue.host:localhost}")
    private String messageQueueHost;

    @Value("${message.queue.port:5672}")
    private Integer messageQueuePort;

    @Value("${message.queue.connection.size:30}")
    private Integer connectionSize;

    private static ConnectionFactory connectionFactory;

    private static final String TOPIC_TYPE = "topic_message";

    @PostConstruct
    private void init() throws IOException, TimeoutException {
        configureConnection();
        configureQueue();
    }

    private void configureConnection() {
        connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(messageQueueHost);
        connectionFactory.setPort(messageQueuePort);
        connectionFactory.setConnectionTimeout(20);
    }

    private void configureQueue() throws IOException, TimeoutException {
        Connection connection = connectionFactory.newConnection(Executors.newFixedThreadPool(connectionSize));
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(TOPIC_TYPE, "topic");

        for (TopicType topicType : TopicType.values()) {
            channel.queueDeclare(topicType.getQueueType(), true, false, false, null);
            channel.queueBind(topicType.getQueueType(), TOPIC_TYPE, topicType.name());
        }

        channel.close();
        connection.close();
    }

    public void send(TopicType topicType, String message) throws IOException, TimeoutException {
        Connection connection = connectionFactory.newConnection(Executors.newFixedThreadPool(connectionSize));
        Channel channel = connection.createChannel();
        channel.basicPublish(TOPIC_TYPE, topicType.name(), null, message.getBytes());
        channel.close();
        connection.close();
        log.info("send topicType : [{}],message: [{}]", topicType.name(), message);
    }
}

- QueueMessageReciever.java

package org.blog.test.queue.service;

import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import org.blog.test.queue.model.TopicType;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;

@Component
@Slf4j
public class QueueMessageReciever {

    @Value("${message.queue.host:localhost}")
    private String messageQueueHost;

    @Value("${message.queue.port:5672}")
    private Integer messageQueuePort;

    @Value("${message.queue.connection.size:30}")
    private Integer connectionSize;

    private static ConnectionFactory connectionFactory;

    @PostConstruct
    private void init() throws IOException, TimeoutException {
        connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(messageQueueHost);
        connectionFactory.setPort(messageQueuePort);
        connectionFactory.setConnectionTimeout(20);

        for (TopicType topicType : TopicType.values()) {
            consume(topicType);
        }
    }

    private void consume(TopicType topicType) throws IOException, TimeoutException {
        Connection connection = connectionFactory.newConnection(Executors.newFixedThreadPool(connectionSize));
        Channel channel = connection.createChannel();

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                log.info("Received envelop : [{}], message : [{}]", envelope.getRoutingKey(), message);
            }
        };
        channel.basicConsume(topicType.getQueueType(), true, consumer);
    }
}

- QueueApplication.java

package org.blog.test;

import org.blog.test.queue.model.TopicType;
import org.blog.test.queue.service.SendMessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

@SpringBootApplication
@RestController
public class QueueApplication {

    @Autowired
    private SendMessageService sendMessageService;

    public static void main(String[] args) {
        SpringApplication.run(QueueApplication.class, args);
    }

    @RequestMapping("/send/{topic}/{message}")
    public void sendMessage(@PathVariable("topic") String topic, @PathVariable("message") String message) throws IOException, TimeoutException {
        sendMessageService.send(TopicType.valueOf(topic.toUpperCase()), message);
    }
}

- TopicType.java

package org.blog.test.queue.model;

import lombok.AllArgsConstructor;
import lombok.Getter;


@AllArgsConstructor
@Getter
public enum TopicType {

    PUSH("PUSH_QUEUE"),
    EMAIL("EMAIL_QUEUE"),
    LOG("LOG_QUEUE");

    private String queueType;

}

- Result

You can find the subscribed log, and graph in rabbitmq console.

댓글 없음 :

댓글 쓰기