RabbitMQ 基于AMQP协议的Java案例+代码

应用服务 12/30 阅读 730 views次 人气 0
摘要:

1. 协议说明

rabbitmq遵循 Advanced Message Queue Protocal(AMQP)协议。

--------------------------------------------------------------------------------------------------------------------------------------

其中amqp协议1.0可以参考我的资源:

http://download.csdn.net/detail/huyangyamin/9725613

2. spring 客户端

spring java客户端使用 com.rabbitmq.client.impl.FrameHandler处理底层二进制协议(binary protocal)的传输。

实际实现类为:

package com.rabbitmq.client.impl;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketException;

import com.rabbitmq.client.AMQP;

/** * A socket-based frame handler. */

public class SocketFrameHandler implements FrameHandler {
    /** The underlying socket */
    private final Socket _socket;

    /** Socket's inputstream - data from the broker - synchronized on */
    private final DataInputStream _inputStream;

    /** Socket's outputstream - data to the broker - synchronized on */
    private final DataOutputStream _outputStream;

    /** Time to linger before closing the socket forcefully. */
    public static final int SOCKET_CLOSING_TIMEOUT = 1;

    /** * @param socket the socket to use */
    public SocketFrameHandler(Socket socket) throws IOException {
        _socket = socket;

        _inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
        _outputStream = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
    }

--------------------------------------------------------------------------------------------------------------------------------------

可以看到SocketFrameHandler持有一个Socket连接,并打开了tcp的read和write IO输入输出流。

3. tcp连接心跳维持

为了维护tcp连接的状态,rabbitmq没有选择使用tcp默认机制的keepalive,而是自己实现了一套心跳机器(在iso 应用层)

rabbitmq客户端专门起了一个定时线程池(ScheduledThreadPoolExecutor)开发送心跳包:

// The contents of this file are subject to the Mozilla Public License
// Version 1.1 (the "License"); you may not use this file except in
// compliance with the License. You may obtain a copy of the License
// at http://www.mozilla.org/MPL/
//
// Software distributed under the License is distributed on an "AS IS"
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
// the License for the specific language governing rights and
// limitations under the License.
//
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is GoPivotal, Inc.
// Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
//


package com.rabbitmq.client.impl;

import com.rabbitmq.client.AMQP;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ScheduledFuture;
import java.io.IOException;

import static java.util.concurrent.TimeUnit.SECONDS;

/** * Manages heartbeat sending for a {@link AMQConnection}. * <p/> * Heartbeats are sent in a dedicated thread that is separate * from the main loop thread used for the connection. */
final class HeartbeatSender {

    private final Object monitor = new Object();

    private final FrameHandler frameHandler;
    private final ThreadFactory threadFactory;

    private ScheduledExecutorService executor;

    private ScheduledFuture<?> future;

    private boolean shutdown = false;

    private volatile long lastActivityTime;

    HeartbeatSender(FrameHandler frameHandler, ThreadFactory threadFactory) {
        this.frameHandler = frameHandler;
        this.threadFactory = threadFactory;
    }

    public void signalActivity() {
        this.lastActivityTime = System.nanoTime();
    }

    /** * Sets the heartbeat in seconds. */
    public void setHeartbeat(int heartbeatSeconds) {
        synchronized(this.monitor) {
            if(this.shutdown) {
                return;
            }

            // cancel any existing heartbeat task
            if(this.future != null) {
                this.future.cancel(true);
                this.future = null;
            }

            if (heartbeatSeconds > 0) {
                // wake every heartbeatSeconds / 2 to avoid the worst case
                // where the last activity comes just after the last heartbeat
                long interval = SECONDS.toNanos(heartbeatSeconds) / 2;
                ScheduledExecutorService executor = createExecutorIfNecessary();
                Runnable task = new HeartbeatRunnable(interval);
                this.future = executor.scheduleAtFixedRate(
                    task, interval, interval, TimeUnit.NANOSECONDS);
            }
        }
    }

    private ScheduledExecutorService createExecutorIfNecessary() {
        synchronized (this.monitor) {
            if (this.executor == null) {
                this.executor = Executors.newSingleThreadScheduledExecutor(threadFactory);
            }
            return this.executor;
        }
    }

    /** * Shutdown the heartbeat process, if any. */
    public void shutdown() {
        ExecutorService executorToShutdown = null;
        synchronized (this.monitor) {
            if (this.future != null) {
                this.future.cancel(true);
                this.future = null;
            }

            if (this.executor != null) {
                // to be safe, we shouldn't call shutdown holding the
                // monitor.
                executorToShutdown = this.executor;

                this.shutdown = true;
                this.executor = null;
            }
        }
        if(executorToShutdown != null) {
            executorToShutdown.shutdown();
        }
    }

    private final class HeartbeatRunnable implements Runnable {

        private final long heartbeatNanos;

        private HeartbeatRunnable(long heartbeatNanos) {
            this.heartbeatNanos = heartbeatNanos;
        }

        public void run() {
            try {
                long now = System.nanoTime();

                if (now > (lastActivityTime + this.heartbeatNanos)) {
                    frameHandler.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0));
                    frameHandler.flush();
                }
            } catch (IOException e) {
                // ignore
            }
        }
    }
}

--------------------------------------------------------------------------------------------------------------------------------------

小奋斗文章

--------------------------------------------------------------------------------------------------------------------------------------

评论

该文章不支持评论!

分享到: