spring cloud stream rabbitmq

    xiaoxiao2023-10-29  130

    <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> <version>1.3.4.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> </dependencies>

    生产者:

    server.port=8001 server.servlet.context-path=/producer spring.application.name=producer spring.cloud.stream.bindings.output_channel.destination=exchange-3 spring.cloud.stream.bindings.output_channel.group=queue-3 spring.cloud.stream.bindings.output_channel.binder=rabbit_cluster spring.cloud.stream.binders.rabbit_cluster.type=rabbit spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=localhost:5672 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=longlong spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password= spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/ public interface Barista { String OUTPUT_CHANNEL = "output_channel"; @Output(Barista.OUTPUT_CHANNEL) MessageChannel logoutput(); } @EnableBinding(Barista.class) @Service public class RabbitmqSender { @Autowired private Barista barista; // 发送消息 public String sendMessage(Object message, Map<String, Object> properties) throws Exception { try{ MessageHeaders mhs = new MessageHeaders(properties); Message msg = MessageBuilder.createMessage(message, mhs); boolean sendStatus = barista.logoutput().send(msg); System.err.println("--------------sending -------------------"); System.out.println("发送数据:" + message + ",sendStatus: " + sendStatus); }catch (Exception e){ System.err.println("-------------error-------------"); e.printStackTrace(); throw new RuntimeException(e.getMessage()); } return null; } } @Autowired private RabbitmqSender rabbitmqSender; @Test public void sendMessageTest1() { for(int i = 0; i < 1; i ++){ try { Map<String, Object> properties = new HashMap<String, Object>(); properties.put("SERIAL_NUMBER", "12345"); properties.put("BANK_NUMBER", "abc"); properties.put("PLAT_SEND_TIME", DateUtils.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS")); rabbitmqSender.sendMessage("Hello, I am amqp sender num :" + i, properties); } catch (Exception e) { System.out.println("--------error-------"); e.printStackTrace(); } } }

    消费者:

    server.port=8002 server.context-path=/consumer spring.application.name=consumer spring.cloud.stream.bindings.input_channel.destination=exchange-3 spring.cloud.stream.bindings.input_channel.group=queue-3 spring.cloud.stream.bindings.input_channel.binder=rabbit_cluster spring.cloud.stream.bindings.input_channel.consumer.concurrency=1 spring.cloud.stream.rabbit.bindings.input_channel.consumer.requeue-rejected=false spring.cloud.stream.rabbit.bindings.input_channel.consumer.acknowledge-mode=MANUAL spring.cloud.stream.rabbit.bindings.input_channel.consumer.recovery-interval=3000 spring.cloud.stream.rabbit.bindings.input_channel.consumer.durable-subscription=true spring.cloud.stream.rabbit.bindings.input_channel.consumer.max-concurrency=5 spring.cloud.stream.binders.rabbit_cluster.type=rabbit spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=localhost:5672 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=longlong spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password= spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/ public interface Barista { String INPUT_CHANNEL = "input_channel"; @Input(Barista.INPUT_CHANNEL) SubscribableChannel loginput(); } @EnableBinding(Barista.class) @Service public class RabbitmqReceiver { @StreamListener(Barista.INPUT_CHANNEL) public void receiver(Message message) throws Exception { Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL); Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); System.out.println("Input Stream 1 接受数据:" + message); System.out.println("消费完毕------------"); channel.basicAck(deliveryTag, false); } }
    最新回复(0)