848 lines
23 KiB
Go
848 lines
23 KiB
Go
// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
// Source code and contact info at http://github.com/streadway/amqp
|
|
|
|
package amqp
|
|
|
|
import (
|
|
"bufio"
|
|
"crypto/tls"
|
|
"io"
|
|
"net"
|
|
"reflect"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
maxChannelMax = (2 << 15) - 1
|
|
|
|
defaultHeartbeat = 10 * time.Second
|
|
defaultConnectionTimeout = 30 * time.Second
|
|
defaultProduct = "https://github.com/streadway/amqp"
|
|
defaultVersion = "β"
|
|
// Safer default that makes channel leaks a lot easier to spot
|
|
// before they create operational headaches. See https://github.com/rabbitmq/rabbitmq-server/issues/1593.
|
|
defaultChannelMax = (2 << 10) - 1
|
|
defaultLocale = "en_US"
|
|
)
|
|
|
|
// Config is used in DialConfig and Open to specify the desired tuning
|
|
// parameters used during a connection open handshake. The negotiated tuning
|
|
// will be stored in the returned connection's Config field.
|
|
type Config struct {
|
|
// The SASL mechanisms to try in the client request, and the successful
|
|
// mechanism used on the Connection object.
|
|
// If SASL is nil, PlainAuth from the URL is used.
|
|
SASL []Authentication
|
|
|
|
// Vhost specifies the namespace of permissions, exchanges, queues and
|
|
// bindings on the server. Dial sets this to the path parsed from the URL.
|
|
Vhost string
|
|
|
|
ChannelMax int // 0 max channels means 2^16 - 1
|
|
FrameSize int // 0 max bytes means unlimited
|
|
Heartbeat time.Duration // less than 1s uses the server's interval
|
|
|
|
// TLSClientConfig specifies the client configuration of the TLS connection
|
|
// when establishing a tls transport.
|
|
// If the URL uses an amqps scheme, then an empty tls.Config with the
|
|
// ServerName from the URL is used.
|
|
TLSClientConfig *tls.Config
|
|
|
|
// Properties is table of properties that the client advertises to the server.
|
|
// This is an optional setting - if the application does not set this,
|
|
// the underlying library will use a generic set of client properties.
|
|
Properties Table
|
|
|
|
// Connection locale that we expect to always be en_US
|
|
// Even though servers must return it as per the AMQP 0-9-1 spec,
|
|
// we are not aware of it being used other than to satisfy the spec requirements
|
|
Locale string
|
|
|
|
// Dial returns a net.Conn prepared for a TLS handshake with TSLClientConfig,
|
|
// then an AMQP connection handshake.
|
|
// If Dial is nil, net.DialTimeout with a 30s connection and 30s deadline is
|
|
// used during TLS and AMQP handshaking.
|
|
Dial func(network, addr string) (net.Conn, error)
|
|
}
|
|
|
|
// Connection manages the serialization and deserialization of frames from IO
|
|
// and dispatches the frames to the appropriate channel. All RPC methods and
|
|
// asynchronous Publishing, Delivery, Ack, Nack and Return messages are
|
|
// multiplexed on this channel. There must always be active receivers for
|
|
// every asynchronous message on this connection.
|
|
type Connection struct {
|
|
destructor sync.Once // shutdown once
|
|
sendM sync.Mutex // conn writer mutex
|
|
m sync.Mutex // struct field mutex
|
|
|
|
conn io.ReadWriteCloser
|
|
|
|
rpc chan message
|
|
writer *writer
|
|
sends chan time.Time // timestamps of each frame sent
|
|
deadlines chan readDeadliner // heartbeater updates read deadlines
|
|
|
|
allocator *allocator // id generator valid after openTune
|
|
channels map[uint16]*Channel
|
|
|
|
noNotify bool // true when we will never notify again
|
|
closes []chan *Error
|
|
blocks []chan Blocking
|
|
|
|
errors chan *Error
|
|
|
|
Config Config // The negotiated Config after connection.open
|
|
|
|
Major int // Server's major version
|
|
Minor int // Server's minor version
|
|
Properties Table // Server properties
|
|
Locales []string // Server locales
|
|
|
|
closed int32 // Will be 1 if the connection is closed, 0 otherwise. Should only be accessed as atomic
|
|
}
|
|
|
|
type readDeadliner interface {
|
|
SetReadDeadline(time.Time) error
|
|
}
|
|
|
|
// DefaultDial establishes a connection when config.Dial is not provided
|
|
func DefaultDial(connectionTimeout time.Duration) func(network, addr string) (net.Conn, error) {
|
|
return func(network, addr string) (net.Conn, error) {
|
|
conn, err := net.DialTimeout(network, addr, connectionTimeout)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Heartbeating hasn't started yet, don't stall forever on a dead server.
|
|
// A deadline is set for TLS and AMQP handshaking. After AMQP is established,
|
|
// the deadline is cleared in openComplete.
|
|
if err := conn.SetDeadline(time.Now().Add(connectionTimeout)); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return conn, nil
|
|
}
|
|
}
|
|
|
|
// Dial accepts a string in the AMQP URI format and returns a new Connection
|
|
// over TCP using PlainAuth. Defaults to a server heartbeat interval of 10
|
|
// seconds and sets the handshake deadline to 30 seconds. After handshake,
|
|
// deadlines are cleared.
|
|
//
|
|
// Dial uses the zero value of tls.Config when it encounters an amqps://
|
|
// scheme. It is equivalent to calling DialTLS(amqp, nil).
|
|
func Dial(url string) (*Connection, error) {
|
|
return DialConfig(url, Config{
|
|
Heartbeat: defaultHeartbeat,
|
|
Locale: defaultLocale,
|
|
})
|
|
}
|
|
|
|
// DialTLS accepts a string in the AMQP URI format and returns a new Connection
|
|
// over TCP using PlainAuth. Defaults to a server heartbeat interval of 10
|
|
// seconds and sets the initial read deadline to 30 seconds.
|
|
//
|
|
// DialTLS uses the provided tls.Config when encountering an amqps:// scheme.
|
|
func DialTLS(url string, amqps *tls.Config) (*Connection, error) {
|
|
return DialConfig(url, Config{
|
|
Heartbeat: defaultHeartbeat,
|
|
TLSClientConfig: amqps,
|
|
Locale: defaultLocale,
|
|
})
|
|
}
|
|
|
|
// DialConfig accepts a string in the AMQP URI format and a configuration for
|
|
// the transport and connection setup, returning a new Connection. Defaults to
|
|
// a server heartbeat interval of 10 seconds and sets the initial read deadline
|
|
// to 30 seconds.
|
|
func DialConfig(url string, config Config) (*Connection, error) {
|
|
var err error
|
|
var conn net.Conn
|
|
|
|
uri, err := ParseURI(url)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if config.SASL == nil {
|
|
config.SASL = []Authentication{uri.PlainAuth()}
|
|
}
|
|
|
|
if config.Vhost == "" {
|
|
config.Vhost = uri.Vhost
|
|
}
|
|
|
|
addr := net.JoinHostPort(uri.Host, strconv.FormatInt(int64(uri.Port), 10))
|
|
|
|
dialer := config.Dial
|
|
if dialer == nil {
|
|
dialer = DefaultDial(defaultConnectionTimeout)
|
|
}
|
|
|
|
conn, err = dialer("tcp", addr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if uri.Scheme == "amqps" {
|
|
if config.TLSClientConfig == nil {
|
|
config.TLSClientConfig = new(tls.Config)
|
|
}
|
|
|
|
// If ServerName has not been specified in TLSClientConfig,
|
|
// set it to the URI host used for this connection.
|
|
if config.TLSClientConfig.ServerName == "" {
|
|
config.TLSClientConfig.ServerName = uri.Host
|
|
}
|
|
|
|
client := tls.Client(conn, config.TLSClientConfig)
|
|
if err := client.Handshake(); err != nil {
|
|
|
|
conn.Close()
|
|
return nil, err
|
|
}
|
|
|
|
conn = client
|
|
}
|
|
|
|
return Open(conn, config)
|
|
}
|
|
|
|
/*
|
|
Open accepts an already established connection, or other io.ReadWriteCloser as
|
|
a transport. Use this method if you have established a TLS connection or wish
|
|
to use your own custom transport.
|
|
|
|
*/
|
|
func Open(conn io.ReadWriteCloser, config Config) (*Connection, error) {
|
|
c := &Connection{
|
|
conn: conn,
|
|
writer: &writer{bufio.NewWriter(conn)},
|
|
channels: make(map[uint16]*Channel),
|
|
rpc: make(chan message),
|
|
sends: make(chan time.Time),
|
|
errors: make(chan *Error, 1),
|
|
deadlines: make(chan readDeadliner, 1),
|
|
}
|
|
go c.reader(conn)
|
|
return c, c.open(config)
|
|
}
|
|
|
|
/*
|
|
LocalAddr returns the local TCP peer address, or ":0" (the zero value of net.TCPAddr)
|
|
as a fallback default value if the underlying transport does not support LocalAddr().
|
|
*/
|
|
func (c *Connection) LocalAddr() net.Addr {
|
|
if conn, ok := c.conn.(interface {
|
|
LocalAddr() net.Addr
|
|
}); ok {
|
|
return conn.LocalAddr()
|
|
}
|
|
return &net.TCPAddr{}
|
|
}
|
|
|
|
// ConnectionState returns basic TLS details of the underlying transport.
|
|
// Returns a zero value when the underlying connection does not implement
|
|
// ConnectionState() tls.ConnectionState.
|
|
func (c *Connection) ConnectionState() tls.ConnectionState {
|
|
if conn, ok := c.conn.(interface {
|
|
ConnectionState() tls.ConnectionState
|
|
}); ok {
|
|
return conn.ConnectionState()
|
|
}
|
|
return tls.ConnectionState{}
|
|
}
|
|
|
|
/*
|
|
NotifyClose registers a listener for close events either initiated by an error
|
|
accompanying a connection.close method or by a normal shutdown.
|
|
|
|
On normal shutdowns, the chan will be closed.
|
|
|
|
To reconnect after a transport or protocol error, register a listener here and
|
|
re-run your setup process.
|
|
|
|
*/
|
|
func (c *Connection) NotifyClose(receiver chan *Error) chan *Error {
|
|
c.m.Lock()
|
|
defer c.m.Unlock()
|
|
|
|
if c.noNotify {
|
|
close(receiver)
|
|
} else {
|
|
c.closes = append(c.closes, receiver)
|
|
}
|
|
|
|
return receiver
|
|
}
|
|
|
|
/*
|
|
NotifyBlocked registers a listener for RabbitMQ specific TCP flow control
|
|
method extensions connection.blocked and connection.unblocked. Flow control is
|
|
active with a reason when Blocking.Blocked is true. When a Connection is
|
|
blocked, all methods will block across all connections until server resources
|
|
become free again.
|
|
|
|
This optional extension is supported by the server when the
|
|
"connection.blocked" server capability key is true.
|
|
|
|
*/
|
|
func (c *Connection) NotifyBlocked(receiver chan Blocking) chan Blocking {
|
|
c.m.Lock()
|
|
defer c.m.Unlock()
|
|
|
|
if c.noNotify {
|
|
close(receiver)
|
|
} else {
|
|
c.blocks = append(c.blocks, receiver)
|
|
}
|
|
|
|
return receiver
|
|
}
|
|
|
|
/*
|
|
Close requests and waits for the response to close the AMQP connection.
|
|
|
|
It's advisable to use this message when publishing to ensure all kernel buffers
|
|
have been flushed on the server and client before exiting.
|
|
|
|
An error indicates that server may not have received this request to close but
|
|
the connection should be treated as closed regardless.
|
|
|
|
After returning from this call, all resources associated with this connection,
|
|
including the underlying io, Channels, Notify listeners and Channel consumers
|
|
will also be closed.
|
|
*/
|
|
func (c *Connection) Close() error {
|
|
if c.IsClosed() {
|
|
return ErrClosed
|
|
}
|
|
|
|
defer c.shutdown(nil)
|
|
return c.call(
|
|
&connectionClose{
|
|
ReplyCode: replySuccess,
|
|
ReplyText: "kthxbai",
|
|
},
|
|
&connectionCloseOk{},
|
|
)
|
|
}
|
|
|
|
func (c *Connection) closeWith(err *Error) error {
|
|
if c.IsClosed() {
|
|
return ErrClosed
|
|
}
|
|
|
|
defer c.shutdown(err)
|
|
return c.call(
|
|
&connectionClose{
|
|
ReplyCode: uint16(err.Code),
|
|
ReplyText: err.Reason,
|
|
},
|
|
&connectionCloseOk{},
|
|
)
|
|
}
|
|
|
|
// IsClosed returns true if the connection is marked as closed, otherwise false
|
|
// is returned.
|
|
func (c *Connection) IsClosed() bool {
|
|
return (atomic.LoadInt32(&c.closed) == 1)
|
|
}
|
|
|
|
func (c *Connection) send(f frame) error {
|
|
if c.IsClosed() {
|
|
return ErrClosed
|
|
}
|
|
|
|
c.sendM.Lock()
|
|
err := c.writer.WriteFrame(f)
|
|
c.sendM.Unlock()
|
|
|
|
if err != nil {
|
|
// shutdown could be re-entrant from signaling notify chans
|
|
go c.shutdown(&Error{
|
|
Code: FrameError,
|
|
Reason: err.Error(),
|
|
})
|
|
} else {
|
|
// Broadcast we sent a frame, reducing heartbeats, only
|
|
// if there is something that can receive - like a non-reentrant
|
|
// call or if the heartbeater isn't running
|
|
select {
|
|
case c.sends <- time.Now():
|
|
default:
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (c *Connection) shutdown(err *Error) {
|
|
atomic.StoreInt32(&c.closed, 1)
|
|
|
|
c.destructor.Do(func() {
|
|
c.m.Lock()
|
|
defer c.m.Unlock()
|
|
|
|
if err != nil {
|
|
for _, c := range c.closes {
|
|
c <- err
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
c.errors <- err
|
|
}
|
|
// Shutdown handler goroutine can still receive the result.
|
|
close(c.errors)
|
|
|
|
for _, c := range c.closes {
|
|
close(c)
|
|
}
|
|
|
|
for _, c := range c.blocks {
|
|
close(c)
|
|
}
|
|
|
|
// Shutdown the channel, but do not use closeChannel() as it calls
|
|
// releaseChannel() which requires the connection lock.
|
|
//
|
|
// Ranging over c.channels and calling releaseChannel() that mutates
|
|
// c.channels is racy - see commit 6063341 for an example.
|
|
for _, ch := range c.channels {
|
|
ch.shutdown(err)
|
|
}
|
|
|
|
c.conn.Close()
|
|
|
|
c.channels = map[uint16]*Channel{}
|
|
c.allocator = newAllocator(1, c.Config.ChannelMax)
|
|
c.noNotify = true
|
|
})
|
|
}
|
|
|
|
// All methods sent to the connection channel should be synchronous so we
|
|
// can handle them directly without a framing component
|
|
func (c *Connection) demux(f frame) {
|
|
if f.channel() == 0 {
|
|
c.dispatch0(f)
|
|
} else {
|
|
c.dispatchN(f)
|
|
}
|
|
}
|
|
|
|
func (c *Connection) dispatch0(f frame) {
|
|
switch mf := f.(type) {
|
|
case *methodFrame:
|
|
switch m := mf.Method.(type) {
|
|
case *connectionClose:
|
|
// Send immediately as shutdown will close our side of the writer.
|
|
c.send(&methodFrame{
|
|
ChannelId: 0,
|
|
Method: &connectionCloseOk{},
|
|
})
|
|
|
|
c.shutdown(newError(m.ReplyCode, m.ReplyText))
|
|
case *connectionBlocked:
|
|
for _, c := range c.blocks {
|
|
c <- Blocking{Active: true, Reason: m.Reason}
|
|
}
|
|
case *connectionUnblocked:
|
|
for _, c := range c.blocks {
|
|
c <- Blocking{Active: false}
|
|
}
|
|
default:
|
|
c.rpc <- m
|
|
}
|
|
case *heartbeatFrame:
|
|
// kthx - all reads reset our deadline. so we can drop this
|
|
default:
|
|
// lolwat - channel0 only responds to methods and heartbeats
|
|
c.closeWith(ErrUnexpectedFrame)
|
|
}
|
|
}
|
|
|
|
func (c *Connection) dispatchN(f frame) {
|
|
c.m.Lock()
|
|
channel := c.channels[f.channel()]
|
|
c.m.Unlock()
|
|
|
|
if channel != nil {
|
|
channel.recv(channel, f)
|
|
} else {
|
|
c.dispatchClosed(f)
|
|
}
|
|
}
|
|
|
|
// section 2.3.7: "When a peer decides to close a channel or connection, it
|
|
// sends a Close method. The receiving peer MUST respond to a Close with a
|
|
// Close-Ok, and then both parties can close their channel or connection. Note
|
|
// that if peers ignore Close, deadlock can happen when both peers send Close
|
|
// at the same time."
|
|
//
|
|
// When we don't have a channel, so we must respond with close-ok on a close
|
|
// method. This can happen between a channel exception on an asynchronous
|
|
// method like basic.publish and a synchronous close with channel.close.
|
|
// In that case, we'll get both a channel.close and channel.close-ok in any
|
|
// order.
|
|
func (c *Connection) dispatchClosed(f frame) {
|
|
// Only consider method frames, drop content/header frames
|
|
if mf, ok := f.(*methodFrame); ok {
|
|
switch mf.Method.(type) {
|
|
case *channelClose:
|
|
c.send(&methodFrame{
|
|
ChannelId: f.channel(),
|
|
Method: &channelCloseOk{},
|
|
})
|
|
case *channelCloseOk:
|
|
// we are already closed, so do nothing
|
|
default:
|
|
// unexpected method on closed channel
|
|
c.closeWith(ErrClosed)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Reads each frame off the IO and hand off to the connection object that
|
|
// will demux the streams and dispatch to one of the opened channels or
|
|
// handle on channel 0 (the connection channel).
|
|
func (c *Connection) reader(r io.Reader) {
|
|
buf := bufio.NewReader(r)
|
|
frames := &reader{buf}
|
|
conn, haveDeadliner := r.(readDeadliner)
|
|
|
|
for {
|
|
frame, err := frames.ReadFrame()
|
|
|
|
if err != nil {
|
|
c.shutdown(&Error{Code: FrameError, Reason: err.Error()})
|
|
return
|
|
}
|
|
|
|
c.demux(frame)
|
|
|
|
if haveDeadliner {
|
|
c.deadlines <- conn
|
|
}
|
|
}
|
|
}
|
|
|
|
// Ensures that at least one frame is being sent at the tuned interval with a
|
|
// jitter tolerance of 1s
|
|
func (c *Connection) heartbeater(interval time.Duration, done chan *Error) {
|
|
const maxServerHeartbeatsInFlight = 3
|
|
|
|
var sendTicks <-chan time.Time
|
|
if interval > 0 {
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
sendTicks = ticker.C
|
|
}
|
|
|
|
lastSent := time.Now()
|
|
|
|
for {
|
|
select {
|
|
case at, stillSending := <-c.sends:
|
|
// When actively sending, depend on sent frames to reset server timer
|
|
if stillSending {
|
|
lastSent = at
|
|
} else {
|
|
return
|
|
}
|
|
|
|
case at := <-sendTicks:
|
|
// When idle, fill the space with a heartbeat frame
|
|
if at.Sub(lastSent) > interval-time.Second {
|
|
if err := c.send(&heartbeatFrame{}); err != nil {
|
|
// send heartbeats even after close/closeOk so we
|
|
// tick until the connection starts erroring
|
|
return
|
|
}
|
|
}
|
|
|
|
case conn := <-c.deadlines:
|
|
// When reading, reset our side of the deadline, if we've negotiated one with
|
|
// a deadline that covers at least 2 server heartbeats
|
|
if interval > 0 {
|
|
conn.SetReadDeadline(time.Now().Add(maxServerHeartbeatsInFlight * interval))
|
|
}
|
|
|
|
case <-done:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Convenience method to inspect the Connection.Properties["capabilities"]
|
|
// Table for server identified capabilities like "basic.ack" or
|
|
// "confirm.select".
|
|
func (c *Connection) isCapable(featureName string) bool {
|
|
capabilities, _ := c.Properties["capabilities"].(Table)
|
|
hasFeature, _ := capabilities[featureName].(bool)
|
|
return hasFeature
|
|
}
|
|
|
|
// allocateChannel records but does not open a new channel with a unique id.
|
|
// This method is the initial part of the channel lifecycle and paired with
|
|
// releaseChannel
|
|
func (c *Connection) allocateChannel() (*Channel, error) {
|
|
c.m.Lock()
|
|
defer c.m.Unlock()
|
|
|
|
if c.IsClosed() {
|
|
return nil, ErrClosed
|
|
}
|
|
|
|
id, ok := c.allocator.next()
|
|
if !ok {
|
|
return nil, ErrChannelMax
|
|
}
|
|
|
|
ch := newChannel(c, uint16(id))
|
|
c.channels[uint16(id)] = ch
|
|
|
|
return ch, nil
|
|
}
|
|
|
|
// releaseChannel removes a channel from the registry as the final part of the
|
|
// channel lifecycle
|
|
func (c *Connection) releaseChannel(id uint16) {
|
|
c.m.Lock()
|
|
defer c.m.Unlock()
|
|
|
|
delete(c.channels, id)
|
|
c.allocator.release(int(id))
|
|
}
|
|
|
|
// openChannel allocates and opens a channel, must be paired with closeChannel
|
|
func (c *Connection) openChannel() (*Channel, error) {
|
|
ch, err := c.allocateChannel()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := ch.open(); err != nil {
|
|
c.releaseChannel(ch.id)
|
|
return nil, err
|
|
}
|
|
return ch, nil
|
|
}
|
|
|
|
// closeChannel releases and initiates a shutdown of the channel. All channel
|
|
// closures should be initiated here for proper channel lifecycle management on
|
|
// this connection.
|
|
func (c *Connection) closeChannel(ch *Channel, e *Error) {
|
|
ch.shutdown(e)
|
|
c.releaseChannel(ch.id)
|
|
}
|
|
|
|
/*
|
|
Channel opens a unique, concurrent server channel to process the bulk of AMQP
|
|
messages. Any error from methods on this receiver will render the receiver
|
|
invalid and a new Channel should be opened.
|
|
|
|
*/
|
|
func (c *Connection) Channel() (*Channel, error) {
|
|
return c.openChannel()
|
|
}
|
|
|
|
func (c *Connection) call(req message, res ...message) error {
|
|
// Special case for when the protocol header frame is sent insted of a
|
|
// request method
|
|
if req != nil {
|
|
if err := c.send(&methodFrame{ChannelId: 0, Method: req}); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
select {
|
|
case err, ok := <-c.errors:
|
|
if !ok {
|
|
return ErrClosed
|
|
}
|
|
return err
|
|
|
|
case msg := <-c.rpc:
|
|
// Try to match one of the result types
|
|
for _, try := range res {
|
|
if reflect.TypeOf(msg) == reflect.TypeOf(try) {
|
|
// *res = *msg
|
|
vres := reflect.ValueOf(try).Elem()
|
|
vmsg := reflect.ValueOf(msg).Elem()
|
|
vres.Set(vmsg)
|
|
return nil
|
|
}
|
|
}
|
|
return ErrCommandInvalid
|
|
}
|
|
// unreachable
|
|
}
|
|
|
|
// Connection = open-Connection *use-Connection close-Connection
|
|
// open-Connection = C:protocol-header
|
|
// S:START C:START-OK
|
|
// *challenge
|
|
// S:TUNE C:TUNE-OK
|
|
// C:OPEN S:OPEN-OK
|
|
// challenge = S:SECURE C:SECURE-OK
|
|
// use-Connection = *channel
|
|
// close-Connection = C:CLOSE S:CLOSE-OK
|
|
// / S:CLOSE C:CLOSE-OK
|
|
func (c *Connection) open(config Config) error {
|
|
if err := c.send(&protocolHeader{}); err != nil {
|
|
return err
|
|
}
|
|
|
|
return c.openStart(config)
|
|
}
|
|
|
|
func (c *Connection) openStart(config Config) error {
|
|
start := &connectionStart{}
|
|
|
|
if err := c.call(nil, start); err != nil {
|
|
return err
|
|
}
|
|
|
|
c.Major = int(start.VersionMajor)
|
|
c.Minor = int(start.VersionMinor)
|
|
c.Properties = Table(start.ServerProperties)
|
|
c.Locales = strings.Split(start.Locales, " ")
|
|
|
|
// eventually support challenge/response here by also responding to
|
|
// connectionSecure.
|
|
auth, ok := pickSASLMechanism(config.SASL, strings.Split(start.Mechanisms, " "))
|
|
if !ok {
|
|
return ErrSASL
|
|
}
|
|
|
|
// Save this mechanism off as the one we chose
|
|
c.Config.SASL = []Authentication{auth}
|
|
|
|
// Set the connection locale to client locale
|
|
c.Config.Locale = config.Locale
|
|
|
|
return c.openTune(config, auth)
|
|
}
|
|
|
|
func (c *Connection) openTune(config Config, auth Authentication) error {
|
|
if len(config.Properties) == 0 {
|
|
config.Properties = Table{
|
|
"product": defaultProduct,
|
|
"version": defaultVersion,
|
|
}
|
|
}
|
|
|
|
config.Properties["capabilities"] = Table{
|
|
"connection.blocked": true,
|
|
"consumer_cancel_notify": true,
|
|
}
|
|
|
|
ok := &connectionStartOk{
|
|
ClientProperties: config.Properties,
|
|
Mechanism: auth.Mechanism(),
|
|
Response: auth.Response(),
|
|
Locale: config.Locale,
|
|
}
|
|
tune := &connectionTune{}
|
|
|
|
if err := c.call(ok, tune); err != nil {
|
|
// per spec, a connection can only be closed when it has been opened
|
|
// so at this point, we know it's an auth error, but the socket
|
|
// was closed instead. Return a meaningful error.
|
|
return ErrCredentials
|
|
}
|
|
|
|
// When the server and client both use default 0, then the max channel is
|
|
// only limited by uint16.
|
|
c.Config.ChannelMax = pick(config.ChannelMax, int(tune.ChannelMax))
|
|
if c.Config.ChannelMax == 0 {
|
|
c.Config.ChannelMax = defaultChannelMax
|
|
}
|
|
c.Config.ChannelMax = min(c.Config.ChannelMax, maxChannelMax)
|
|
|
|
// Frame size includes headers and end byte (len(payload)+8), even if
|
|
// this is less than FrameMinSize, use what the server sends because the
|
|
// alternative is to stop the handshake here.
|
|
c.Config.FrameSize = pick(config.FrameSize, int(tune.FrameMax))
|
|
|
|
// Save this off for resetDeadline()
|
|
c.Config.Heartbeat = time.Second * time.Duration(pick(
|
|
int(config.Heartbeat/time.Second),
|
|
int(tune.Heartbeat)))
|
|
|
|
// "The client should start sending heartbeats after receiving a
|
|
// Connection.Tune method"
|
|
go c.heartbeater(c.Config.Heartbeat, c.NotifyClose(make(chan *Error, 1)))
|
|
|
|
if err := c.send(&methodFrame{
|
|
ChannelId: 0,
|
|
Method: &connectionTuneOk{
|
|
ChannelMax: uint16(c.Config.ChannelMax),
|
|
FrameMax: uint32(c.Config.FrameSize),
|
|
Heartbeat: uint16(c.Config.Heartbeat / time.Second),
|
|
},
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
return c.openVhost(config)
|
|
}
|
|
|
|
func (c *Connection) openVhost(config Config) error {
|
|
req := &connectionOpen{VirtualHost: config.Vhost}
|
|
res := &connectionOpenOk{}
|
|
|
|
if err := c.call(req, res); err != nil {
|
|
// Cannot be closed yet, but we know it's a vhost problem
|
|
return ErrVhost
|
|
}
|
|
|
|
c.Config.Vhost = config.Vhost
|
|
|
|
return c.openComplete()
|
|
}
|
|
|
|
// openComplete performs any final Connection initialization dependent on the
|
|
// connection handshake and clears any state needed for TLS and AMQP handshaking.
|
|
func (c *Connection) openComplete() error {
|
|
// We clear the deadlines and let the heartbeater reset the read deadline if requested.
|
|
// RabbitMQ uses TCP flow control at this point for pushback so Writes can
|
|
// intentionally block.
|
|
if deadliner, ok := c.conn.(interface {
|
|
SetDeadline(time.Time) error
|
|
}); ok {
|
|
_ = deadliner.SetDeadline(time.Time{})
|
|
}
|
|
|
|
c.allocator = newAllocator(1, c.Config.ChannelMax)
|
|
return nil
|
|
}
|
|
|
|
func max(a, b int) int {
|
|
if a > b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
func min(a, b int) int {
|
|
if a < b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
func pick(client, server int) int {
|
|
if client == 0 || server == 0 {
|
|
return max(client, server)
|
|
}
|
|
return min(client, server)
|
|
}
|