The following is a stream of consciousness diagnosis and debugging of a hanging shutdown:
Debugging the shutdownTracerProvider
I noticed that the test suite was failing to cleanly exit.
The final thing in the app is withGlobalTracing
, which looks like:
bracket initializeTracing shutdownTraceProvider (const action)
This bracket
comes from UnliftIO.Exception
, which means that
shutdownTraceProvider
runs in an uninterruptibleMask
. This makes it pretty
fragile - if it blocks, it may become unkillable.
Turning it into a Control.Exception.bracket
, which does not mask the cleanup
action, did allow the shutdown to complete. So something potentially odd is
going on here!
I did a crude timer on the shutdown, and it took 30 seconds.
That's a suspicious amount - seems like a timeout to me.
-- | This method provides a way for provider to do any cleanup required.
--
-- This will also trigger shutdowns on all internal processors.
--
-- @since 0.0.1.0
shutdownTracerProvider :: MonadIO m => TracerProvider -> m ()
shutdownTracerProvider TracerProvider{..} = liftIO $ do
asyncShutdownResults <- forM tracerProviderProcessors $ \processor -> do
processorShutdown processor
mapM_ wait asyncShutdownResults
So, we'll look at each processor, call processShutdown
on it, and then wait
on those async
s to finish. processorShutdown
is a field on the Processor
record:
data Processor = Processor
{ processorOnStart :: IORef ImmutableSpan -> Context -> IO ()
-- ^ Called when a span is started. This method is called synchronously on the thread that started the span, therefore it should not block or throw exceptions.
, processorOnEnd :: IORef ImmutableSpan -> IO ()
-- ^ Called after a span is ended (i.e., the end timestamp is already set). This method is called synchronously within the 'OpenTelemetry.Trace.endSpan' API, therefore it should not block or throw an exception.
, processorShutdown :: IO (Async ShutdownResult)
-- ^ Shuts down the processor. Called when SDK is shut down. This is an opportunity for processor to do any cleanup required.
--
-- Shutdown SHOULD be called only once for each SpanProcessor instance. After the call to Shutdown, subsequent calls to OnStart, OnEnd, or ForceFlush are not allowed. SDKs SHOULD ignore these calls gracefully, if possible.
--
-- Shutdown SHOULD let the caller know whether it succeeded, failed or timed out.
--
-- Shutdown MUST include the effects of ForceFlush.
--
-- Shutdown SHOULD complete or abort within some timeout. Shutdown can be implemented as a blocking API or an asynchronous API which notifies the caller via a callback or an event. OpenTelemetry client authors can decide if they want to make the shutdown timeout configurable.
{- snip ... -}
As a record field, we now need to find where this provider is created.
Fortunately, that's right there in bracket
:
-- src/Mercury/Tracing.hs
initializeTracing :: MonadUnliftIO m => m TracerProvider
initializeTracing = do
(processors, tracerOptions') <- liftIO getTracerProviderInitializationOptions
-- FIXME: this is probably an upstream bug in hs-opentelemetry, but regardless
-- Throw away all spans if the OTLP endpoint is not configured: anyone
-- exporting to Honeycomb would have to have set this, so it should be fine
otlpEndpoint <- liftIO $ lookupEnv "OTEL_EXPORTER_OTLP_ENDPOINT"
let processors' = case otlpEndpoint of
Just _ -> processors
Nothing -> []
provider <- createTracerProvider processors' tracerOptions'
setGlobalTracerProvider provider
pure provider
So we use the processors for the default, but only if the otlpEndpoint is
populated.
-- src/OpenTelemetry/Trace.hs
getTracerProviderInitializationOptions :: IO ([Processor], TracerProviderOptions)
getTracerProviderInitializationOptions = getTracerProviderInitializationOptions' (mempty :: Resource 'Nothing)
getTracerProviderInitializationOptions' :: (ResourceMerge 'Nothing any ~ 'Nothing) => Resource any -> IO ([Processor], TracerProviderOptions)
getTracerProviderInitializationOptions' rs = do
sampler <- detectSampler
attrLimits <- detectAttributeLimits
spanLimits <- detectSpanLimits
propagators <- detectPropagators
processorConf <- detectBatchProcessorConfig
exporters <- detectExporters
builtInRs <- detectBuiltInResources
envVarRs <- mkResource . map Just <$> detectResourceAttributes
let allRs = mergeResources (builtInRs <> envVarRs) rs
processors <- case exporters of
[] -> do
pure []
e:_ -> do
pure <$> batchProcessor processorConf e
let providerOpts = emptyTracerProviderOptions
{ tracerProviderOptionsIdGenerator = defaultIdGenerator
, tracerProviderOptionsSampler = sampler
, tracerProviderOptionsAttributeLimits = attrLimits
, tracerProviderOptionsSpanLimits = spanLimits
, tracerProviderOptionsPropagators = propagators
, tracerProviderOptionsResources = materializeResources allRs
}
pure (processors, providerOpts)
-- src/OpenTelemetry/Trace/Core.hs
-- | Initialize a new tracer provider
--
-- You should generally use 'getGlobalTracerProvider' for most applications.
createTracerProvider :: MonadIO m => [Processor] -> TracerProviderOptions -> m TracerProvider
createTracerProvider ps opts = liftIO $ do
let g = tracerProviderOptionsIdGenerator opts
pure $ TracerProvider
(V.fromList ps)
g
(tracerProviderOptionsSampler opts)
(tracerProviderOptionsResources opts)
(tracerProviderOptionsAttributeLimits opts)
(tracerProviderOptionsSpanLimits opts)
(tracerProviderOptionsPropagators opts)
(tracerProviderOptionsLogger opts)
So createTracerProvider
isn't doing anything fancy.
processors
is either an empty list, or if we have at least one exporter, then
we create the batchProcessor
. Nothing else modifies it, so we're good to just
look at the function:
-- |
-- The batch processor accepts spans and places them into batches. Batching helps better compress the data and reduce the number of outgoing connections
-- required to transmit the data. This processor supports both size and time based batching.
--
batchProcessor :: MonadIO m => BatchTimeoutConfig -> Exporter ImmutableSpan -> m Processor
batchProcessor BatchTimeoutConfig{..} exporter = liftIO $ do
batch <- newIORef $ boundedMap maxQueueSize
workSignal <- newEmptyMVar
worker <- async $ loop $ do
req <- liftIO $ timeout (millisToMicros scheduledDelayMillis)
$ takeMVar workSignal
batchToProcess <- liftIO $ atomicModifyIORef' batch buildExport
res <- liftIO $ Exporter.exporterExport exporter batchToProcess
-- if we were asked to shutdown, quit cleanly after this batch
-- FIXME: this could lose batches if there's more than one in queue?
case req of
Just Shutdown -> throwE res
_ -> pure ()
pure $ Processor
{ processorOnStart = \_ _ -> pure ()
, processorOnEnd = \s -> do
span_ <- readIORef s
appendFailed <- atomicModifyIORef' batch $ \builder ->
case push span_ builder of
Nothing -> (builder, True)
Just b' -> (b', False)
when appendFailed $ void $ tryPutMVar workSignal Flush
, processorForceFlush = void $ tryPutMVar workSignal Flush
-- TODO where to call restore, if anywhere?
, processorShutdown = async $ mask $ \_restore -> do
-- flush remaining messages
void $ tryPutMVar workSignal Shutdown
shutdownResult <- timeout (millisToMicros exportTimeoutMillis) $
wait worker
-- make sure the worker comes down
uninterruptibleCancel worker
-- TODO, not convinced we should shut down processor here
case shutdownResult of
Nothing -> pure ShutdownFailure
Just _ -> pure ShutdownSuccess
}
where
millisToMicros = (* 1000)
There's a lot here, so let's just look at processShutdown
- that's the actual
code we're running.
-- TODO where to call restore, if anywhere?
, processorShutdown = async $ mask $ \_restore -> do
-- flush remaining messages
void $ tryPutMVar workSignal Shutdown
shutdownResult <- timeout (millisToMicros exportTimeoutMillis) $
wait worker
-- make sure the worker comes down
uninterruptibleCancel worker
-- TODO, not convinced we should shut down processor here
case shutdownResult of
Nothing -> pure ShutdownFailure
Just _ -> pure ShutdownSuccess
Well, we have a mask
there, and we never restore
it. But the original code
was in an uninterruptibleMask
thanks to bracket
anyway.
Line by line, let's look at these:
-- flush remaining messages
void $ tryPutMVar workSignal Shutdown
tryPutMVar
is guaranteed not to block, so unlikely to be a problem here.
However, if the workSignal
is full, then the Shutdown
message is not
received.
shutdownResult <- timeout (millisToMicros exportTimeoutMillis) $
wait worker
This may be our culprit.timeout
comes from System.Timeout
in the base
package. The docs have a hint:
-- |Wrap an 'IO' computation to time out and return @Nothing@ in case no result
-- is available within @n@ microseconds (@1\/10^6@ seconds). In case a result
-- is available before the timeout expires, @Just a@ is returned. A negative
-- timeout interval means \"wait indefinitely\". When specifying long timeouts,
-- be careful not to exceed @maxBound :: Int@.
--
-- >>> timeout 1000000 (threadDelay 1000 *> pure "finished on time")
-- Just "finished on time"
--
-- >>> timeout 10000 (threadDelay 100000 *> pure "finished on time")
-- Nothing
--
-- The design of this combinator was guided by the objective that @timeout n f@
-- should behave exactly the same as @f@ as long as @f@ doesn't time out. This
-- means that @f@ has the same 'myThreadId' it would have without the timeout
-- wrapper. Any exceptions @f@ might throw cancel the timeout and propagate
-- further up. It also possible for @f@ to receive exceptions thrown to it by
-- another thread.
--
-- A tricky implementation detail is the question of how to abort an @IO@
-- computation. This combinator relies on asynchronous exceptions internally
-- (namely throwing the computation the 'Timeout' exception). The technique
-- works very well for computations executing inside of the Haskell runtime
-- system, but it doesn't work at all for non-Haskell code. Foreign function
-- calls, for example, cannot be timed out with this combinator simply because
-- an arbitrary C function cannot receive asynchronous exceptions. When
-- @timeout@ is used to wrap an FFI call that blocks, no timeout event can be
-- delivered until the FFI call returns, which pretty much negates the purpose
-- of the combinator. In practice, however, this limitation is less severe than
-- it may sound. Standard I\/O functions like 'System.IO.hGetBuf',
-- 'System.IO.hPutBuf', Network.Socket.accept, or 'System.IO.hWaitForInput'
-- appear to be blocking, but they really don't because the runtime system uses
-- scheduling mechanisms like @select(2)@ to perform asynchronous I\/O, so it
-- is possible to interrupt standard socket I\/O or file I\/O using this
-- combinator.
---
-- Note that 'timeout' cancels the computation by throwing it the 'Timeout'
-- exception. Consequently blanket exception handlers (e.g. catching
-- 'SomeException') within the computation will break the timeout behavior.
Ah, so, alas, it throws an asynchronous exception. But we are in an
uninterruptible masked state. So the exception is never delivered to our
timeout. That explains why UnliftIO.Exception.bracket
causes the problem.
(also, that timeout? that's the 30 seconds we're waiting on --
batchTimeoutConfig
has a default of 30,000 milliseconds to wait)
Woof. That's why our thread is not dying, but why isn't it, uh, dying
correctly?
Well, back to the shutdown code - what happens next?
-- make sure the worker comes down
uninterruptibleCancel worker
-- TODO, not convinced we should shut down processor here
Huh. Not entirely clear what goes on here!
-- | Cancel an asynchronous action
--
-- This is a variant of `cancel`, but it is not interruptible.
{-# INLINE uninterruptibleCancel #-}
uninterruptibleCancel :: AsyncM a -> IO ()
uninterruptibleCancel = uninterruptibleMask_ . cancel
OK, so we're not saying "you MUST cancel, NOW", we're saying, "We're going to
cancel you, and the cancellation is uninterruptible." Let's look at cancel
itself:
-- | Cancel an asynchronous action by throwing the @AsyncCancelled@
-- exception to it, and waiting for the `Async` thread to quit.
-- Has no effect if the 'Async' has already completed.
--
-- > cancel a = throwTo (asyncThreadId a) AsyncCancelled <* waitCatch a
--
-- Note that 'cancel' will not terminate until the thread the 'Async'
-- refers to has terminated. This means that 'cancel' will block for
-- as long said thread blocks when receiving an asynchronous exception.
--
-- For example, it could block if:
--
-- * It's executing a foreign call, and thus cannot receive the asynchronous
-- exception;
-- * It's executing some cleanup handler after having received the exception,
-- and the handler is blocking.
{-# INLINE cancel #-}
cancel :: Async a -> IO ()
cancel a@(Async t _) = throwTo t AsyncCancelled <* waitCatch a
OK, so cancel
actually waits for the Async
to finish! So if the Async
isn't responding to shutdown for whatever reason, then cancel
won't return.
It seems to me that we should just be using cancel
. That way, if we receive an
async exception in our shutdown, then we don't completely freak out.
case shutdownResult of
Nothing -> pure ShutdownFailure
Just _ -> pure ShutdownSuccess
OK, this is pure, nothing funny or fancy here.
Investigating the worker
OK, so timeout
is why we're not dying, but the real reason is that we're
doing something that takes forever inside of that timeout - trying to do a wait worker
. So let's look at worker
and see what it's doing.
worker <- async $ loop $ do
req <- liftIO $ timeout (millisToMicros scheduledDelayMillis)
$ takeMVar workSignal
batchToProcess <- liftIO $ atomicModifyIORef' batch buildExport
res <- liftIO $ Exporter.exporterExport exporter batchToProcess
-- if we were asked to shutdown, quit cleanly after this batch
-- FIXME: this could lose batches if there's more than one in queue?
case req of
Just Shutdown -> throwE res
_ -> pure ()
worker
is an Async ExportResult
. loop
is a neat trick:
-- | Exitable forever loop
loop :: Monad m => ExceptT e m a -> m e
loop = liftM (either id id) . runExceptT . forever
throwE
terminates the loop.
So, when the worker
is created, we use async
. When we call
withGlobalTracing
, we aren't in a masked state. And the initialization doesn't
appear to do any masking either. So this should be cancellable.
We have another timeout
- this time, we're trying to takeMVar
from the
workSignal
. If workSignal
is empty, this will block. So we takeMVar
for
about ~5000ms by default - 5 seconds. There are a few things that put things in
the work signal: Shutdown
causes the loop to exit.
Expoerter.exporterExport
is a record field:
data Exporter a = Exporter
{ exporterExport :: HashMap InstrumentationLibrary (Vector a) -> IO ExportResult
, exporterShutdown :: IO ()
}
That was provided in getTracerProviderInitializationOptions'
:
exporters <- detectExporters
{- snip -}
processors <- case exporters of
[] -> do
pure []
e:_ -> do
pure <$> batchProcessor processorConf e
So, where does this come from?
knownExporters :: [(T.Text, IO (Exporter ImmutableSpan))]
knownExporters =
[ ("otlp", do
otlpConfig <- loadExporterEnvironmentVariables
otlpExporter otlpConfig
)
, ("jaeger", error "Jaeger exporter not implemented")
, ("zipkin", error "Zipkin exporter not implemented")
]
-- TODO, support multiple exporters
detectExporters :: IO [Exporter ImmutableSpan]
detectExporters = do
exportersInEnv <- fmap (T.splitOn "," . T.pack) <$> lookupEnv "OTEL_TRACES_EXPORTER"
if exportersInEnv == Just ["none"]
then pure []
else do
let envExporters = fromMaybe ["otlp"] exportersInEnv
exportersAndRegistryEntry = map (\k -> maybe (Left k) Right $ lookup k knownExporters) envExporters
(_notFound, exporterIntializers) = partitionEithers exportersAndRegistryEntry
-- TODO, notFound logging
sequence exporterIntializers
So, the only real option appears to be otlpExporter
. Digging in... and it's a
big function, so skim it real quick.
-- | Initial the OTLP 'Exporter'
otlpExporter :: (MonadIO m) => OTLPExporterConfig -> m (Exporter OT.ImmutableSpan)
otlpExporter conf = do
-- TODO, url parsing is janky
-- TODO configurable retryDelay, maximum retry counts
req <- liftIO $ parseRequest (maybe "http://localhost:4318/v1/traces" (<> "/v1/traces") (otlpEndpoint conf))
let (encodingHeader, encoder) = maybe (id, id)
(\case
None -> (id, id)
GZip -> (((hContentEncoding, "gzip") :), compress)
)
(otlpTracesCompression conf <|> otlpCompression conf)
baseReqHeaders = encodingHeader $
(hContentType, protobufMimeType) :
(hAcceptEncoding, protobufMimeType) :
fromMaybe [] (otlpHeaders conf) ++
fromMaybe [] (otlpTracesHeaders conf) ++
requestHeaders req
baseReq = req
{ method = "POST"
, requestHeaders = baseReqHeaders
}
pure $ Exporter
{ exporterExport = \spans_ -> do
let anySpansToExport = H.size spans_ /= 0 && not (all V.null $ H.elems spans_)
if anySpansToExport
then do
result <- try $ exporterExportCall encoder baseReq spans_
case result of
Left err -> do
print err
pure $ Failure $ Just err
Right ok -> pure ok
else pure Success
, exporterShutdown = pure ()
}
where
retryDelay = 100_000 -- 100ms
maxRetryCount = 5
isRetryableStatusCode status_ =
status_ == status408 || status_ == status429 || (statusCode status_ >= 500 && statusCode status_ < 600)
isRetryableException = \case
ResponseTimeout -> True
ConnectionTimeout -> True
ConnectionFailure _ -> True
ConnectionClosed -> True
_ -> False
exporterExportCall encoder baseReq spans_ = do
msg <- encodeMessage <$> immutableSpansToProtobuf spans_
-- TODO handle server disconnect
let req = baseReq
{ requestBody =
RequestBodyLBS $ encoder $ L.fromStrict msg
}
sendReq req 0 -- TODO =<< getTime for maximum cutoff
sendReq req backoffCount = do
eResp <- try $ httpBS req
let exponentialBackoff = if backoffCount == maxRetryCount
then pure $ Failure Nothing
else do
threadDelay (retryDelay `shiftL` backoffCount)
sendReq req (backoffCount + 1)
case eResp of
Left err@(HttpExceptionRequest _ e) -> if isRetryableException e
then exponentialBackoff
else pure $ Failure $ Just $ SomeException err
Left err -> pure $ Failure $ Just $ SomeException err
Right resp -> if isRetryableStatusCode (responseStatus resp)
then case lookup hRetryAfter $ responseHeaders resp of
Nothing -> exponentialBackoff
Just retryAfter -> do
-- TODO support date in retry-after header
case readMaybe $ C.unpack retryAfter of
Nothing -> exponentialBackoff
Just seconds -> do
threadDelay (seconds * 1_000_000)
sendReq req (backoffCount + 1)
else pure $! if statusCode (responseStatus resp) >= 300
then Failure Nothing
else Success
Wow! OK, that's pretty huge. The active code appears to be the then
branch
inside of the record field:
if anySpansToExport
then do
result <- try $ exporterExportCall encoder baseReq spans_
case result of
Left err -> do
print err
pure $ Failure $ Just err
Right ok -> pure ok
else pure Success
printing? in my dependencies? it's more likely than you think
anyway, we're definitely doing something that can take a while. Looks like our
app is potentially doing a lot of threadDelay
to try and get these things in.
Well, that could explain the behavior we're seeing - if we're stuck in a
exponentialBackoff
that initially waits 100ms and doubles every failure, we
may see that behavior.
Just out of curiosity, it does seem like... this should eventually end? So I'm
just going to let it run for maybe 20 minutes or so and see what happens.
Starting at about 3:15 my time...
and at 3:24, it's still goin'. Not good. Something is up.
result <- try $ exporterExportCall encoder baseReq spans_
case result of
Left err -> do
print err
pure $ Failure $ Just err
That err
is a SomeException
data ExportResult
= Success
| Failure (Maybe SomeException)
which means we could have caught an async exception, since this is plain
Control.Exception
:
import Control.Exception (SomeException(..), try)
But, if we got a Failure
, then we should have handled that in the call to
the exporter, right? Let's jump back up the stack a bit, and see that exporter
call.
worker <- async $ loop $ do
req <- liftIO $ timeout (millisToMicros scheduledDelayMillis)
$ takeMVar workSignal
batchToProcess <- liftIO $ atomicModifyIORef' batch buildExport
res <- liftIO $ Exporter.exporterExport exporter batchToProcess
-- if we were asked to shutdown, quit cleanly after this batch
-- FIXME: this could lose batches if there's more than one in queue?
case req of
Just Shutdown -> throwE res
_ -> pure ()
oops! We don't actually do anything with res
! Success or failure, we just
kinda... ignore it.
So my new hypothesis is something like this:
The exporterExport
caught an async exception in try
, and this was dropped in
the worker
loop. For some reason, we don't get the memo to shutdown, and the
timeout never completes, because the thread is blocked wait
ing on the
worker
.
So, what else can fill that MVar
?
, processorOnEnd = \s -> do
span_ <- readIORef s
appendFailed <- atomicModifyIORef' batch $ \builder ->
case push span_ builder of
Nothing -> (builder, True)
Just b' -> (b', False)
when appendFailed $ void $ tryPutMVar workSignal Flush
, processorForceFlush = void $ tryPutMVar workSignal Flush
These are our two candidates. They can do a tryPutMVar
with Flush
, which
would not shut the worker down.
What calls processorOnEnd
? endSpan
, which is used liberally in our codebase
as part of inSpan = bracket createSpan endSpan
.
OK, let's run this through:
worker
starts a loop iteration. The timeout starts.
endSpan
happens, and we get a Flush
in the MVar
.
worker
takeMVar
succeeds, emptying the MVar
.
worker
then calls exporterExport
, which begins the export. This may take
a long time if the request fails for some reason!
endSpan
happens, we get another Flush
in the MVar
.
- BOOM, we've called
processorShutdown
. Now we tryPutMVar Shutdown
, but
this does nothing because it already has a Flush
.
- The
timeout
begins, wait
ing for the worker to complete. But because the
Shutdown
write didn't take, it won't ever stop.
- Then the
worker
gets a cancel
, receiving an async exception to kill the
thread. BUT... if this happens in that try
? then we swallow the exception
and keep going! Most likely, because that try
surrounds a lot of
threadDelay
, we will throw to that try
.
wait worker
never completes.
I'm gonna work up a PR that should alleviate these issues.