{-# LANGUAGE RecordWildCards #-}
module OpenTelemetry.Processor.Batch.Span (
BatchTimeoutConfig (..),
batchTimeoutConfig,
batchProcessor,
) where
import Control.Concurrent (rtsSupportsBoundThreads)
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.IO.Class
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HashMap
import Data.IORef (atomicModifyIORef', newIORef, readIORef)
import Data.Vector (Vector)
import OpenTelemetry.Exporter.Span (SpanExporter)
import qualified OpenTelemetry.Exporter.Span as SpanExporter
import OpenTelemetry.Processor.Span
import OpenTelemetry.Trace.Core
import VectorBuilder.Builder as Builder
import VectorBuilder.Vector as Builder
data BatchTimeoutConfig = BatchTimeoutConfig
{ BatchTimeoutConfig -> Int
maxQueueSize :: Int
, BatchTimeoutConfig -> Int
scheduledDelayMillis :: Int
, BatchTimeoutConfig -> Int
exportTimeoutMillis :: Int
, BatchTimeoutConfig -> Int
maxExportBatchSize :: Int
}
deriving (Int -> BatchTimeoutConfig -> ShowS
[BatchTimeoutConfig] -> ShowS
BatchTimeoutConfig -> String
(Int -> BatchTimeoutConfig -> ShowS)
-> (BatchTimeoutConfig -> String)
-> ([BatchTimeoutConfig] -> ShowS)
-> Show BatchTimeoutConfig
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> BatchTimeoutConfig -> ShowS
showsPrec :: Int -> BatchTimeoutConfig -> ShowS
$cshow :: BatchTimeoutConfig -> String
show :: BatchTimeoutConfig -> String
$cshowList :: [BatchTimeoutConfig] -> ShowS
showList :: [BatchTimeoutConfig] -> ShowS
Show)
batchTimeoutConfig :: BatchTimeoutConfig
batchTimeoutConfig :: BatchTimeoutConfig
batchTimeoutConfig =
BatchTimeoutConfig
{ maxQueueSize :: Int
maxQueueSize = Int
1024
, scheduledDelayMillis :: Int
scheduledDelayMillis = Int
5000
, exportTimeoutMillis :: Int
exportTimeoutMillis = Int
30000
, maxExportBatchSize :: Int
maxExportBatchSize = Int
512
}
data BoundedMap a = BoundedMap
{ forall a. BoundedMap a -> Int
itemBounds :: !Int
, forall a. BoundedMap a -> Int
itemMaxExportBounds :: !Int
, forall a. BoundedMap a -> Int
itemCount :: !Int
, forall a.
BoundedMap a -> HashMap InstrumentationLibrary (Builder a)
itemMap :: HashMap InstrumentationLibrary (Builder.Builder a)
}
boundedMap :: Int -> Int -> BoundedMap a
boundedMap :: forall a. Int -> Int -> BoundedMap a
boundedMap Int
bounds Int
exportBounds = Int
-> Int
-> Int
-> HashMap InstrumentationLibrary (Builder a)
-> BoundedMap a
forall a.
Int
-> Int
-> Int
-> HashMap InstrumentationLibrary (Builder a)
-> BoundedMap a
BoundedMap Int
bounds Int
exportBounds Int
0 HashMap InstrumentationLibrary (Builder a)
forall a. Monoid a => a
mempty
push :: ImmutableSpan -> BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
push :: ImmutableSpan
-> BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
push ImmutableSpan
s BoundedMap ImmutableSpan
m =
if BoundedMap ImmutableSpan -> Int
forall a. BoundedMap a -> Int
itemCount BoundedMap ImmutableSpan
m Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= BoundedMap ImmutableSpan -> Int
forall a. BoundedMap a -> Int
itemBounds BoundedMap ImmutableSpan
m
then Maybe (BoundedMap ImmutableSpan)
forall a. Maybe a
Nothing
else
BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
forall a. a -> Maybe a
Just (BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan))
-> BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
forall a b. (a -> b) -> a -> b
$!
BoundedMap ImmutableSpan
m
{ itemCount = itemCount m + 1
, itemMap =
HashMap.insertWith
(<>)
(tracerName $ spanTracer s)
(Builder.singleton s)
$ itemMap m
}
buildExport :: BoundedMap a -> (BoundedMap a, HashMap InstrumentationLibrary (Vector a))
buildExport :: forall a.
BoundedMap a
-> (BoundedMap a, HashMap InstrumentationLibrary (Vector a))
buildExport BoundedMap a
m =
( BoundedMap a
m {itemCount = 0, itemMap = mempty}
, Builder a -> Vector a
forall (vector :: * -> *) element.
Vector vector element =>
Builder element -> vector element
Builder.build (Builder a -> Vector a)
-> HashMap InstrumentationLibrary (Builder a)
-> HashMap InstrumentationLibrary (Vector a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BoundedMap a -> HashMap InstrumentationLibrary (Builder a)
forall a.
BoundedMap a -> HashMap InstrumentationLibrary (Builder a)
itemMap BoundedMap a
m
)
data ProcessorMessage = ScheduledFlush | MaxExportFlush | Shutdown
batchProcessor :: (MonadIO m) => BatchTimeoutConfig -> SpanExporter -> m SpanProcessor
batchProcessor :: forall (m :: * -> *).
MonadIO m =>
BatchTimeoutConfig -> SpanExporter -> m SpanProcessor
batchProcessor BatchTimeoutConfig {Int
maxQueueSize :: BatchTimeoutConfig -> Int
scheduledDelayMillis :: BatchTimeoutConfig -> Int
exportTimeoutMillis :: BatchTimeoutConfig -> Int
maxExportBatchSize :: BatchTimeoutConfig -> Int
maxQueueSize :: Int
scheduledDelayMillis :: Int
exportTimeoutMillis :: Int
maxExportBatchSize :: Int
..} SpanExporter
exporter = IO SpanProcessor -> m SpanProcessor
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO SpanProcessor -> m SpanProcessor)
-> IO SpanProcessor -> m SpanProcessor
forall a b. (a -> b) -> a -> b
$ do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
rtsSupportsBoundThreads (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
forall a. HasCallStack => String -> a
error String
"The hs-opentelemetry batch processor does not work without the -threaded GHC flag!"
IORef (BoundedMap ImmutableSpan)
batch <- BoundedMap ImmutableSpan -> IO (IORef (BoundedMap ImmutableSpan))
forall a. a -> IO (IORef a)
newIORef (BoundedMap ImmutableSpan -> IO (IORef (BoundedMap ImmutableSpan)))
-> BoundedMap ImmutableSpan
-> IO (IORef (BoundedMap ImmutableSpan))
forall a b. (a -> b) -> a -> b
$ Int -> Int -> BoundedMap ImmutableSpan
forall a. Int -> Int -> BoundedMap a
boundedMap Int
maxQueueSize Int
maxExportBatchSize
TMVar ()
workSignal <- IO (TMVar ())
forall a. IO (TMVar a)
newEmptyTMVarIO
TMVar ()
shutdownSignal <- IO (TMVar ())
forall a. IO (TMVar a)
newEmptyTMVarIO
let publish :: HashMap InstrumentationLibrary (Vector ImmutableSpan)
-> IO ExportResult
publish HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess = IO ExportResult -> IO ExportResult
forall a. IO a -> IO a
mask_ (IO ExportResult -> IO ExportResult)
-> IO ExportResult -> IO ExportResult
forall a b. (a -> b) -> a -> b
$ do
SpanExporter
-> HashMap InstrumentationLibrary (Vector ImmutableSpan)
-> IO ExportResult
SpanExporter.spanExporterExport SpanExporter
exporter HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess
let flushQueueImmediately :: ExportResult -> IO ExportResult
flushQueueImmediately ExportResult
ret = do
HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess <- IORef (BoundedMap ImmutableSpan)
-> (BoundedMap ImmutableSpan
-> (BoundedMap ImmutableSpan,
HashMap InstrumentationLibrary (Vector ImmutableSpan)))
-> IO (HashMap InstrumentationLibrary (Vector ImmutableSpan))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (BoundedMap ImmutableSpan)
batch BoundedMap ImmutableSpan
-> (BoundedMap ImmutableSpan,
HashMap InstrumentationLibrary (Vector ImmutableSpan))
forall a.
BoundedMap a
-> (BoundedMap a, HashMap InstrumentationLibrary (Vector a))
buildExport
if HashMap InstrumentationLibrary (Vector ImmutableSpan) -> Bool
forall a. HashMap InstrumentationLibrary a -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess
then do
ExportResult -> IO ExportResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ExportResult
ret
else do
ExportResult
ret' <- HashMap InstrumentationLibrary (Vector ImmutableSpan)
-> IO ExportResult
publish HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess
ExportResult -> IO ExportResult
flushQueueImmediately ExportResult
ret'
let waiting :: IO ProcessorMessage
waiting = do
TVar Bool
delay <- Int -> IO (TVar Bool)
registerDelay (Int -> Int
millisToMicros Int
scheduledDelayMillis)
STM ProcessorMessage -> IO ProcessorMessage
forall a. STM a -> IO a
atomically (STM ProcessorMessage -> IO ProcessorMessage)
-> STM ProcessorMessage -> IO ProcessorMessage
forall a b. (a -> b) -> a -> b
$ do
[STM ProcessorMessage] -> STM ProcessorMessage
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, MonadPlus m) =>
t (m a) -> m a
msum
[ ProcessorMessage
ScheduledFlush ProcessorMessage -> STM () -> STM ProcessorMessage
forall a b. a -> STM b -> STM a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ do
Bool
continue <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
delay
Bool -> STM ()
check Bool
continue
, ProcessorMessage
MaxExportFlush ProcessorMessage -> STM () -> STM ProcessorMessage
forall a b. a -> STM b -> STM a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ TMVar () -> STM ()
forall a. TMVar a -> STM a
takeTMVar TMVar ()
workSignal
, ProcessorMessage
Shutdown ProcessorMessage -> STM () -> STM ProcessorMessage
forall a b. a -> STM b -> STM a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ TMVar () -> STM ()
forall a. TMVar a -> STM a
takeTMVar TMVar ()
shutdownSignal
]
let workerAction :: IO ExportResult
workerAction = do
ProcessorMessage
req <- IO ProcessorMessage
waiting
HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess <- IORef (BoundedMap ImmutableSpan)
-> (BoundedMap ImmutableSpan
-> (BoundedMap ImmutableSpan,
HashMap InstrumentationLibrary (Vector ImmutableSpan)))
-> IO (HashMap InstrumentationLibrary (Vector ImmutableSpan))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (BoundedMap ImmutableSpan)
batch BoundedMap ImmutableSpan
-> (BoundedMap ImmutableSpan,
HashMap InstrumentationLibrary (Vector ImmutableSpan))
forall a.
BoundedMap a
-> (BoundedMap a, HashMap InstrumentationLibrary (Vector a))
buildExport
ExportResult
res <- HashMap InstrumentationLibrary (Vector ImmutableSpan)
-> IO ExportResult
publish HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess
case ProcessorMessage
req of
ProcessorMessage
Shutdown ->
ExportResult -> IO ExportResult
flushQueueImmediately ExportResult
res
ProcessorMessage
_ ->
IO ExportResult
workerAction
Async ExportResult
worker <- ((forall a. IO a -> IO a) -> IO ExportResult)
-> IO (Async ExportResult)
forall a. ((forall a. IO a -> IO a) -> IO a) -> IO (Async a)
asyncWithUnmask (((forall a. IO a -> IO a) -> IO ExportResult)
-> IO (Async ExportResult))
-> ((forall a. IO a -> IO a) -> IO ExportResult)
-> IO (Async ExportResult)
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> IO ExportResult -> IO ExportResult
forall a. IO a -> IO a
unmask IO ExportResult
workerAction
SpanProcessor -> IO SpanProcessor
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SpanProcessor -> IO SpanProcessor)
-> SpanProcessor -> IO SpanProcessor
forall a b. (a -> b) -> a -> b
$
SpanProcessor
{ spanProcessorOnStart :: IORef ImmutableSpan -> Context -> IO ()
spanProcessorOnStart = \IORef ImmutableSpan
_ Context
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
, spanProcessorOnEnd :: IORef ImmutableSpan -> IO ()
spanProcessorOnEnd = \IORef ImmutableSpan
s -> do
ImmutableSpan
span_ <- IORef ImmutableSpan -> IO ImmutableSpan
forall a. IORef a -> IO a
readIORef IORef ImmutableSpan
s
Bool
appendFailedOrExportNeeded <- IORef (BoundedMap ImmutableSpan)
-> (BoundedMap ImmutableSpan -> (BoundedMap ImmutableSpan, Bool))
-> IO Bool
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (BoundedMap ImmutableSpan)
batch ((BoundedMap ImmutableSpan -> (BoundedMap ImmutableSpan, Bool))
-> IO Bool)
-> (BoundedMap ImmutableSpan -> (BoundedMap ImmutableSpan, Bool))
-> IO Bool
forall a b. (a -> b) -> a -> b
$ \BoundedMap ImmutableSpan
builder ->
case ImmutableSpan
-> BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
push ImmutableSpan
span_ BoundedMap ImmutableSpan
builder of
Maybe (BoundedMap ImmutableSpan)
Nothing -> (BoundedMap ImmutableSpan
builder, Bool
True)
Just BoundedMap ImmutableSpan
b' ->
if BoundedMap ImmutableSpan -> Int
forall a. BoundedMap a -> Int
itemCount BoundedMap ImmutableSpan
b' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= BoundedMap ImmutableSpan -> Int
forall a. BoundedMap a -> Int
itemMaxExportBounds BoundedMap ImmutableSpan
b'
then
(BoundedMap ImmutableSpan
b', Bool
True)
else (BoundedMap ImmutableSpan
b', Bool
False)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
appendFailedOrExportNeeded (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TMVar () -> () -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar ()
workSignal ()
, spanProcessorForceFlush :: IO ()
spanProcessorForceFlush = IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TMVar () -> () -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar ()
workSignal ()
,
spanProcessorShutdown :: IO (Async ShutdownResult)
spanProcessorShutdown =
((forall a. IO a -> IO a) -> IO ShutdownResult)
-> IO (Async ShutdownResult)
forall a. ((forall a. IO a -> IO a) -> IO a) -> IO (Async a)
asyncWithUnmask (((forall a. IO a -> IO a) -> IO ShutdownResult)
-> IO (Async ShutdownResult))
-> ((forall a. IO a -> IO a) -> IO ShutdownResult)
-> IO (Async ShutdownResult)
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> IO ShutdownResult -> IO ShutdownResult
forall a. IO a -> IO a
unmask (IO ShutdownResult -> IO ShutdownResult)
-> IO ShutdownResult -> IO ShutdownResult
forall a b. (a -> b) -> a -> b
$ do
((forall a. IO a -> IO a) -> IO ShutdownResult)
-> IO ShutdownResult
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO ShutdownResult)
-> IO ShutdownResult)
-> ((forall a. IO a -> IO a) -> IO ShutdownResult)
-> IO ShutdownResult
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
_restore -> do
IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar () -> () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar ()
shutdownSignal ()
TVar Bool
delay <- Int -> IO (TVar Bool)
registerDelay (Int -> Int
millisToMicros Int
exportTimeoutMillis)
Maybe (Either SomeException ExportResult)
shutdownResult <-
STM (Maybe (Either SomeException ExportResult))
-> IO (Maybe (Either SomeException ExportResult))
forall a. STM a -> IO a
atomically (STM (Maybe (Either SomeException ExportResult))
-> IO (Maybe (Either SomeException ExportResult)))
-> STM (Maybe (Either SomeException ExportResult))
-> IO (Maybe (Either SomeException ExportResult))
forall a b. (a -> b) -> a -> b
$
[STM (Maybe (Either SomeException ExportResult))]
-> STM (Maybe (Either SomeException ExportResult))
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, MonadPlus m) =>
t (m a) -> m a
msum
[ Either SomeException ExportResult
-> Maybe (Either SomeException ExportResult)
forall a. a -> Maybe a
Just (Either SomeException ExportResult
-> Maybe (Either SomeException ExportResult))
-> STM (Either SomeException ExportResult)
-> STM (Maybe (Either SomeException ExportResult))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Async ExportResult -> STM (Either SomeException ExportResult)
forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async ExportResult
worker
, Maybe (Either SomeException ExportResult)
forall a. Maybe a
Nothing Maybe (Either SomeException ExportResult)
-> STM () -> STM (Maybe (Either SomeException ExportResult))
forall a b. a -> STM b -> STM a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ do
Bool
shouldStop <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
delay
Bool -> STM ()
check Bool
shouldStop
]
Async ExportResult -> IO ()
forall a. Async a -> IO ()
cancel Async ExportResult
worker
ShutdownResult -> IO ShutdownResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ShutdownResult -> IO ShutdownResult)
-> ShutdownResult -> IO ShutdownResult
forall a b. (a -> b) -> a -> b
$ case Maybe (Either SomeException ExportResult)
shutdownResult of
Maybe (Either SomeException ExportResult)
Nothing ->
ShutdownResult
ShutdownTimeout
Just Either SomeException ExportResult
er ->
case Either SomeException ExportResult
er of
Left SomeException
_ ->
ShutdownResult
ShutdownFailure
Right ExportResult
_ ->
ShutdownResult
ShutdownSuccess
}
where
millisToMicros :: Int -> Int
millisToMicros = (Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000)