I send data to activeMQ server with the following code:
for {
msg = fmt.Sprintf("aaaaa=%v", count)
client.DeliverMsg("/queue/yyyyyyy", []byte(msg))
count++
}
then I close the activeMQ server, deadlock occurs.
I read the code and found that the problem was here(conn.go):
func (c *Conn) Send(destination, contentType string, body []byte, opts ...func(*frame.Frame) error) error {
c.closeMutex.Lock()
defer c.closeMutex.Unlock()
if c.closed {
return ErrAlreadyClosed
}
f, err := createSendFrame(destination, contentType, body, opts)
if err != nil {
return err
}
if _, ok := f.Header.Contains(frame.Receipt); ok {
// receipt required
request := writeRequest{
Frame: f,
C: make(chan *frame.Frame),
}
c.writeCh <- request
response := <-request.C
if response.Command != frame.RECEIPT {
return newError(response)
}
} else {
// no receipt required
request := writeRequest{Frame: f}
c.writeCh <- request
}
return nil
}
In Send function, write request to c.writeCh use "c.writeCh <- request", length of c.writeCh is initialized to 20
the processLoop function will read writeCh:
case req, ok := <-c.writeCh:
// stop the write timeout
if writeTimer != nil {
writeTimer.Stop()
writeTimer = nil
writeTimeoutChannel = nil
}
if !ok {
sendError(channels, errors.New("write channel closed"))
return
}
if req.C != nil {
if receipt, ok := req.Frame.Header.Contains(frame.Receipt); ok {
// remember the channel for this receipt
channels[receipt] = req.C
}
}
switch req.Frame.Command {
case frame.SUBSCRIBE:
id, _ := req.Frame.Header.Contains(frame.Id)
channels[id] = req.C
case frame.UNSUBSCRIBE:
id, _ := req.Frame.Header.Contains(frame.Id)
// is this trying to be too clever -- add a receipt
// header so that when the server responds with a
// RECEIPT frame, the corresponding channel will be closed
req.Frame.Header.Set(frame.Receipt, id)
}
// frame to send
err := writer.Write(req.Frame)
if err != nil {
sendError(channels, err)
return
}
}
If I call Conn.Send very fast, writeCh will be full, Conn.Send will be blocked at "c.writeCh <- request".
Then in processLoop function, writer.Write(req.Frame) will fail if the activeMQ server suddenly shut down.
Before processLoop return, "defer c.MustDisconnect()" in processLoop will be called.
MustDisconnect() call c.closeMutex.Lock() to lock closeMutex, but closeMutex is locked by Conn.Send, Conn.Send is blocked at "c.writeCh <- request", so closeMutex will never be unlocked, deadlock occurs.