Потребительский поток RabbitMQ Java

Я новичок в rabbitMQ. Я написал java-потребителя следующим образом. Пожалуйста, посоветуйте мне, является ли это правильной реализацией Thread+RabbitMQ. Я создал три потока, которые потребляют данные из очереди и выполняют обработку.

public class TileJobs implements Runnable {
private static final String EXCHANGE_NAME = "fanout_logs";
Connection connection;
ConnectionFactory factory;
Channel channel;
String name;
int count = 0;

TileJobs(String name) throws IOException, TimeoutException {
    factory = new ConnectionFactory();
    factory.setHost("192.168.2.4");
    factory.setUsername("manish");
    factory.setPassword("mm@1234");
    connection = factory.newConnection();
    channel = connection.createChannel();
    this.name = name;
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    channel.queueDeclare("test", true, false, false, null);
    channel.queueBind("test", EXCHANGE_NAME, "");
    channel.basicQos(1);
}

@Override
public void run() {
    // TODO Auto-generated method stub
    System.out.println("inside the run");
    Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(TileJobs.this.name);
            TileJobs.this.count = TileJobs.this.count + 1;
            System.out.println(TileJobs.this.count);
            System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    };
    try {
        channel.basicConsume("test", false, consumer);
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

public class ReceiveLogsDirect {

private static final String EXCHANGE_NAME = "fanout_logs";

public static void main(String[] argv) throws Exception {
    TileJobs consumer = new TileJobs("manish");
    Thread consumerThread = new Thread(consumer);
    consumerThread.start();
    TileJobs consumer1 = new TileJobs("manish1");
    Thread consumerThread1 = new Thread(consumer1);
    consumerThread1.start();
    TileJobs consumer2 = new TileJobs("manish2");
    Thread consumerThread2 = new Thread(consumer2);
    consumerThread2.start();

}

}

С уважением Маниш


person Manish Kumar    schedule 16.03.2018    source источник


Ответы (1)


Вы можете создать класс для подключения к серверу rabbitmq. В программе используется одно соединение с несколькими каналами для использования очереди. Обратите внимание, что один канал для одного потребителя.

person Eden    schedule 09.05.2018