2016년 8월 20일 토요일

Making queue service by using RabbitMq

You can make the queue service by using rabbitmq like below.

- structure















- build.gradle

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:1.4.0.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'spring-boot'

repositories {
    mavenCentral()
}

dependencies {
    compile('org.springframework.boot:spring-boot-starter-web')
    compile group: 'org.projectlombok', name: 'lombok', version: '1.16.10'
    compile group: 'com.rabbitmq', name: 'amqp-client', version: '3.6.5'
}


- application.yml

spring:
  application:
    name: queue-server
server:
  port: 8080


- SendMessageService.java

package org.blog.test.queue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;

@Component
@Slf4j
public class SendMessageService {

    @Value("${message.queue.name:message_queue}")
    private String messageQueue;

    @Value("${message.queue.host:localhost}")
    private String messageQueueHost;

    @Value("${message.queue.port:5672}")
    private Integer messageQueuePort;

    @Value("${message.queue.connection.size:30}")
    private Integer connectionSize;

    private static ConnectionFactory connectionFactory;

    @PostConstruct
    private void init() {
        connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(messageQueueHost);
        connectionFactory.setPort(messageQueuePort);
        connectionFactory.setConnectionTimeout(20);
    }

    public void send(String message) throws IOException, TimeoutException {
        Connection connection = connectionFactory.newConnection(Executors.newFixedThreadPool(connectionSize));
        Channel channel = connection.createChannel();
        channel.queueDeclare(messageQueue, false, false, false, null);
        channel.basicPublish("", messageQueue, null, message.getBytes());
        channel.close();
        connection.close();
        log.info("send message: [{}]", message);
    }
}


- QueueMessageReciever.java

package org.blog.test.queue;

import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;

@Component
@Slf4j
public class QueueMessageReciever {

    @Value("${message.queue.name:message_queue}")
    private String messageQueue;

    @Value("${message.queue.host:localhost}")
    private String messageQueueHost;

    @Value("${message.queue.port:5672}")
    private Integer messageQueuePort;

    @Value("${message.queue.connection.size:30}")
    private Integer connectionSize;

    private static ConnectionFactory connectionFactory;

    @PostConstruct
    private void init() throws IOException, TimeoutException {
        connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(messageQueueHost);
        connectionFactory.setPort(messageQueuePort);
        connectionFactory.setConnectionTimeout(20);
        consume();
    }

    private void consume() throws IOException, TimeoutException {
        Connection connection = connectionFactory.newConnection(Executors.newFixedThreadPool(connectionSize));
        Channel channel = connection.createChannel();
        channel.queueDeclare(messageQueue, false, false, false, null);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                log.info("Received message : [{}]", message);
            }
        };
        channel.basicConsume(messageQueue, true, consumer);
    }
}


- QueueApplication.java

package org.blog.test;

import org.blog.test.queue.SendMessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class QueueApplication implements CommandLineRunner {

    @Autowired
    private SendMessageService sendMessageService;

    public static void main(String[] args) {
        SpringApplication.run(QueueApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        sendMessageService.send("1");
        sendMessageService.send("2");
        sendMessageService.send("3");
    }
}

- result

You can find the result in rabbitmq console and server log.

댓글 없음 :

댓글 쓰기