From 9bd815aa7efb90338da6148627b7bd19610fd1c0 Mon Sep 17 00:00:00 2001 From: Arity-T Date: Tue, 15 Apr 2025 21:39:55 +0300 Subject: [PATCH] =?UTF-8?q?=D0=9F=D0=B5=D1=80=D0=B5=D0=BF=D0=BE=D0=BB?= =?UTF-8?q?=D0=BD=D0=B5=D0=BD=D0=B8=D0=B5=20=D0=B5=D1=81=D1=82=D1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/dev/tishenko/consumer/Recv.java | 34 +++++++++++++++++-- lab3/docker-compose.yml | 10 ++++++ .../main/java/dev/tishenko/producer/Send.java | 23 ++++++++++--- 3 files changed, 60 insertions(+), 7 deletions(-) diff --git a/lab3/consumer/app/src/main/java/dev/tishenko/consumer/Recv.java b/lab3/consumer/app/src/main/java/dev/tishenko/consumer/Recv.java index 83e83ae..eb4fecb 100644 --- a/lab3/consumer/app/src/main/java/dev/tishenko/consumer/Recv.java +++ b/lab3/consumer/app/src/main/java/dev/tishenko/consumer/Recv.java @@ -5,23 +5,51 @@ import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; +import java.util.HashMap; +import java.util.Map; + public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { + String delayEnv = System.getenv("CONSUMER_DELAY_MS"); + int delay = delayEnv != null ? Integer.parseInt(delayEnv) : 5000; + + String maxLengthEnv = System.getenv("QUEUE_MAX_LENGTH"); + int maxLength = maxLengthEnv != null ? Integer.parseInt(maxLengthEnv) : 5; + + String overflowStrategy = System.getenv("QUEUE_OVERFLOW"); + if (overflowStrategy == null) { + overflowStrategy = "drop-head"; // или "reject-publish" + } + ConnectionFactory factory = new ConnectionFactory(); factory.setHost("rabbitmq"); + Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); - channel.queueDeclare(QUEUE_NAME, false, false, false, null); - System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); + Map args = new HashMap<>(); + args.put("x-max-length", maxLength); + args.put("x-overflow", overflowStrategy); + + channel.queueDeclare(QUEUE_NAME, false, false, false, args); + channel.basicQos(1); + + System.out.println(" [*] Waiting for messages..."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + e.printStackTrace(); + } + channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; - channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { + + channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } } diff --git a/lab3/docker-compose.yml b/lab3/docker-compose.yml index 84dfa73..27c7895 100644 --- a/lab3/docker-compose.yml +++ b/lab3/docker-compose.yml @@ -15,6 +15,9 @@ services: context: ./producer depends_on: - rabbitmq + - consumer + environment: + - PRODUCER_DELAY_MS=1000 networks: - rabbitnet @@ -24,6 +27,9 @@ services: context: ./producer depends_on: - rabbitmq + - consumer + environment: + - PRODUCER_DELAY_MS=1000 networks: - rabbitnet @@ -33,6 +39,10 @@ services: context: ./consumer depends_on: - rabbitmq + environment: + - CONSUMER_DELAY_MS=1000 + - QUEUE_MAX_LENGTH=10 + - QUEUE_OVERFLOW=reject-publish # drop-head, reject-publish networks: - rabbitnet diff --git a/lab3/producer/app/src/main/java/dev/tishenko/producer/Send.java b/lab3/producer/app/src/main/java/dev/tishenko/producer/Send.java index 61bc3ef..db951c1 100644 --- a/lab3/producer/app/src/main/java/dev/tishenko/producer/Send.java +++ b/lab3/producer/app/src/main/java/dev/tishenko/producer/Send.java @@ -8,14 +8,29 @@ public class Send { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { + String delayEnv = System.getenv("PRODUCER_DELAY_MS"); + int delay = delayEnv != null ? Integer.parseInt(delayEnv) : 1000; + ConnectionFactory factory = new ConnectionFactory(); factory.setHost("rabbitmq"); + try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { - channel.queueDeclare(QUEUE_NAME, false, false, false, null); - String message = "Hello World!"; - channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); - System.out.println(" [x] Sent '" + message + "'"); + + boolean durable = false; + boolean exclusive = false; + boolean autoDelete = false; + + // channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, null); + channel.queueDeclarePassive(QUEUE_NAME); + + int count = 0; + while (true) { + String message = "Message #" + count++; + channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); + System.out.println(" [x] Sent '" + message + "'"); + Thread.sleep(delay); + } } } }