- structure
- build.gradle
buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:1.4.0.RELEASE")
}
}
apply plugin: 'java'
apply plugin: 'spring-boot'
repositories {
mavenCentral()
}
dependencies {
compile('org.springframework.boot:spring-boot-starter-web')
compile group: 'org.projectlombok', name: 'lombok', version: '1.16.10'
compile group: 'com.rabbitmq', name: 'amqp-client', version: '3.6.5'
}
- application.yml
spring:
application:
name: queue-server
server:
port: 8080
- SendMessageService.java
package org.blog.test.queue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
@Component
@Slf4j
public class SendMessageService {
@Value("${message.queue.name:message_queue}")
private String messageQueue;
@Value("${message.queue.host:localhost}")
private String messageQueueHost;
@Value("${message.queue.port:5672}")
private Integer messageQueuePort;
@Value("${message.queue.connection.size:30}")
private Integer connectionSize;
private static ConnectionFactory connectionFactory;
@PostConstruct
private void init() {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost(messageQueueHost);
connectionFactory.setPort(messageQueuePort);
connectionFactory.setConnectionTimeout(20);
}
public void send(String message) throws IOException, TimeoutException {
Connection connection = connectionFactory.newConnection(Executors.newFixedThreadPool(connectionSize));
Channel channel = connection.createChannel();
channel.queueDeclare(messageQueue, false, false, false, null);
channel.basicPublish("", messageQueue, null, message.getBytes());
channel.close();
connection.close();
log.info("send message: [{}]", message);
}
}
- QueueMessageReciever.java
package org.blog.test.queue;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
@Component
@Slf4j
public class QueueMessageReciever {
@Value("${message.queue.name:message_queue}")
private String messageQueue;
@Value("${message.queue.host:localhost}")
private String messageQueueHost;
@Value("${message.queue.port:5672}")
private Integer messageQueuePort;
@Value("${message.queue.connection.size:30}")
private Integer connectionSize;
private static ConnectionFactory connectionFactory;
@PostConstruct
private void init() throws IOException, TimeoutException {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost(messageQueueHost);
connectionFactory.setPort(messageQueuePort);
connectionFactory.setConnectionTimeout(20);
consume();
}
private void consume() throws IOException, TimeoutException {
Connection connection = connectionFactory.newConnection(Executors.newFixedThreadPool(connectionSize));
Channel channel = connection.createChannel();
channel.queueDeclare(messageQueue, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
log.info("Received message : [{}]", message);
}
};
channel.basicConsume(messageQueue, true, consumer);
}
}
- QueueApplication.java
package org.blog.test;
import org.blog.test.queue.SendMessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class QueueApplication implements CommandLineRunner {
@Autowired
private SendMessageService sendMessageService;
public static void main(String[] args) {
SpringApplication.run(QueueApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
sendMessageService.send("1");
sendMessageService.send("2");
sendMessageService.send("3");
}
}
- result
You can find the result in rabbitmq console and server log.


댓글 없음 :
댓글 쓰기