IO & Logging Capabilities
Contents
Using newtypes and type classes, we create a wrapper around IO
with support for logging capabilities. This approach keeps log entries as values, enables log aggregation, and supports diagnostic contexts, while also making logging easily testable. For dispatching logs, any preferred library can be used.
Logging Challenges
When log entries aren’t values, it’s difficult to test logging. How would we test what gets output from println
or logger.info
? Hopefully we’ve defined a Logger[F]
trait to enable testing and abstracting over the effect type F[_]
(e.g. IO
). Even with a trait in place, there are still a few things left to be desired.
Log entries are spread throughout the application. It’s difficult to locate all of them. While this gives great flexibility, it can be painful to understand what and where an application is logging, especially when coming back to the code later on.
Logging code is not separated from other concerns. Since it’s easy to put in an extra log expression, code for what and how entries get logged is often mixed with other concerns, instead of cleanly separating the different concerns.
Without log accumulation, it’s tricky to understand how a request was processed. Most often, we’ll need some identifier for the request, so we can later piece together all log entries for the request. This is especially true when we’re processing several requests in parallel.
In tests, we’re forced to assert on messages. It’s a good idea to assert on what messages are dispatched, but we don’t want to always assert on the message, since we’d then be forced to manually update several tests as we change the message.
The solution to these challenges is to model log entries as values. We’ll also see how we can add explicit support for log accumulation, take full control over exactly when log messages are being dispatched, and make testing our logging easy.
Introducing MonadLog
The MonadLog
type class, defined below, represents an immutable append-only log, where we can also replace the log with an empty one. Essentially, we can append log entries E
with log
, and then extract them again in context G[_]
with extract
. We can also clear all logs with clear
. By combining extract
and clear
, we can write flush
, which clears the logs after having dispatched them, using the provided dispatch function G[E] => F[Unit]
.
import cats.{Applicative, Monad, MonoidK}
trait MonadLog[F[_], G[_], E] {
val monad: Monad[F]
val applicative: Applicative[G]
val monoidK: MonoidK[G]
import monad.{as, flatMap}
def log(e: E): F[Unit]
def clear[A](fa: F[A]): F[A]
def extract[A](fa: F[A]): F[(A, G[E])]
def flush[A](fa: F[A])(f: G[E] => F[Unit]): F[A] =
clear(flatMap(extract(fa)) { case (a, ge) => as(f(ge), a) })
}
If you’re familiar with cats-mtl, log
looks a bit like tell
from FunctorTell
, and extract
reminds us of listen
from FunctorListen
. The main difference is that log
is strictly an append operation as shown by the following laws. These laws are in addition to laws for Monad[F]
, Applicative[G]
, MonoidK[G]
.
import cats.laws.IsEqArrow
import cats.syntax.applicative._
import cats.syntax.apply._
import cats.syntax.functor._
import org.typelevel.discipline.Laws
trait MonadLogLaws[F[_], G[_], E] extends Laws {
implicit val logInstance: MonadLog[F, G, E]
implicit val monad: Monad[F] = logInstance.monad
implicit val applicative: Applicative[G] = logInstance.applicative
implicit val monoidK: MonoidK[G] = logInstance.monoidK
import logInstance._
private val G: MonoidK[G] = monoidK
def clearRemovesLog(e: E) =
clear(log(e)) <-> ().pure[F]
// Same as FunctorListen#listenAddsNoEffects
def extractAddsNoEffects[A](fa: F[A]) =
extract(fa).map(_._1) <-> fa
def extractRespectsClear[A](fa: F[A]) =
extract(clear(fa)) <-> clear(fa).map((_, G.empty[E]))
// Similar to FunctorListen#listenRespectsTell
def extractRespectsLog(e: E) =
extract(log(e)) <-> log(e).map((_, e.pure[G]))
def extractRespectsLogs(e1: E, e2: E) =
extract(log(e1) *> log(e2)) <-> (log(e1) *> log(e2))
.map((_, G.combineK(e1.pure[G], e2.pure[G])))
}
As far as concrete types go, F[_]
will be our IO
wrapper type. MonadLog
requires Applicative[G]
and MonoidK[G]
, and we will have to require Foldable[G]
to be able to write the dispatch function for flush
. Ideally, G[_]
should support constant time $\mathcal{O}(1)$ append and a single $\mathcal{O}(n)$ in-order fold. The recently added Chain
in Cats does fit those requirements. The log entry type E
is up to us to define, and we’ll define it as LogEntry
in the next section.
Defining Log Entries
We will model log entries with LogEntry
, which consists of a LogLevel
and a NonEmptyString
message. Let’s start by defining LogLevel
. A log level (info
, warning
, error
, …), is essentially an ordered set of levels. We use a wrapper around a refinement type for Int
to represent the log levels. Values correspond to the severity levels used by syslog and log management systems like Graylog. Note that we could have used Byte
, but there is no support for Byte
literals in Scala, so the definition would not be as concise.
import cats.Order
import cats.instances.int._
import cats.instances.order._
import cats.syntax.contravariant._
import eu.timepit.refined.W
import eu.timepit.refined.api.Refined
import eu.timepit.refined.auto._
import eu.timepit.refined.cats.refTypeOrder
import eu.timepit.refined.numeric.Interval
sealed abstract class LogLevel(val value: LogLevel.Value)
object LogLevel {
type Value = Int Refined Interval.Closed[W.`0`.T, W.`7`.T]
case object Error extends LogLevel(3)
case object Warning extends LogLevel(4)
case object Info extends LogLevel(6)
case object Debug extends LogLevel(7)
implicit val logLevelOrder: Order[LogLevel] =
Order[Value].contramap(_.value)
}
We can then define LogEntry
as follows, together with any log entries we want. We’ve now effectively already solved the first logging challenge: log entries are now in a single place, and it should be rather easy to figure out where they’re being used throughout the application.
import cats.syntax.show._
import cats.instances.string._
import eu.timepit.refined.cats.refTypeShow
import eu.timepit.refined.types.numeric.NonNegInt
import eu.timepit.refined.types.string.NonEmptyString
sealed abstract class LogEntry {
def level: LogLevel
def message: NonEmptyString
}
object LogEntry {
case object ApplicationStarted extends LogEntry {
override def level: LogLevel = LogLevel.Info
override def message: NonEmptyString = "Application started"
}
final case class ReceivedRequest(request: NonEmptyString) extends LogEntry {
override def level: LogLevel = LogLevel.Info
override def message: NonEmptyString =
NonEmptyString.unsafeFrom(show"Received request: $request")
}
final case class RequestStillFailing(retries: NonNegInt) extends LogEntry {
override def level: LogLevel = LogLevel.Error
override def message: NonEmptyString =
NonEmptyString.unsafeFrom(show"Request still failing after $retries retries")
}
}
For log entries without parameters, we can use case object
s, and for entries with parameters, we use case class
es. Note that we can vary both the log level and message depending on the arguments. This bring us to the second logging challenge: logic relating to how and what gets logged is now encapsulated in the LogEntry
, rather than mixed with other concerns.
We use the show
interpolator to generate String
s with Show
instances, rather than relying on toString
, for extra compile-time safety. With String
interpolation, we cannot rely on compile-time refinement, so we have to wrap our message in NonEmptyString.unsafeFrom
. It’s still clear the message is not empty.
Finally, since we’re looking to support diagnostic contexts, we’ll define MdcEntry
for that purpose.
sealed abstract class MdcEntry(
val key: NonEmptyString,
val value: NonEmptyString
)
object MdcEntry {
final case class TraceToken(override val value: NonEmptyString)
extends MdcEntry("traceToken", value)
}
A Logging Algebra
Now that we’ve defined the relevant logging models, let’s focus our attention on functions. We’re going to define an algebra in final tagless style, but before doing so, we’re going to need functions for determining the LogLevel
and NonEmptyString
message from log entries G[LogEntry]
. We are going to have to require Foldable[G]
below to implement these functions.
We define the log level of G[LogEntry]
as the most critical level among the log entries. This means we want the lowest level, since level 3 is for Error
and level 7 is for Debug
. We have already defined Order[LogLevel]
and can make use of it here.
import cats.Foldable
import cats.syntax.foldable._
def logLevel[G[_]: Foldable](entries: G[LogEntry]): Option[LogLevel] =
entries.minimumOption(Order.by(_.level)).map(_.level)
For the log message of G[LogEntry]
, we have more options available, but will go with the following.
If
G[LogEntry]
is empty, there are no log entries, so returnNone
.If
G[LogEntry]
contains a single entry, return the message of that entry.If
G[LogEntry]
has two or more entries, prepend-
to the entries, and separate by newline.
Following is an example of a message for two log entries.
- Received request: test
- Request still failing after 3 retries
There are multiple ways to write such a function. One way, which uses a single fold, but likely is quite slow due to the heavy use of String
concatenation, is the following. Depending on your use case, you might want to explore writing more efficient variants of this function. Note that we make use of Eval
to avoid constructing the NonEmptyString
in cases where we’re trying to log at a level which has been disabled.
import cats.{Always, Eval, Semigroup}
import cats.syntax.option._
import cats.syntax.semigroup._
implicit val nonEmptyStringSemigroup: Semigroup[NonEmptyString] =
Semigroup.instance((a, b) => NonEmptyString.unsafeFrom(a.value ++ b.value))
def logMessage[G[_]: Foldable](entries: G[LogEntry]): Option[Eval[NonEmptyString]] = {
def entry(e: LogEntry): NonEmptyString =
("- ": NonEmptyString) combine e.message
def append(s: NonEmptyString, e: LogEntry): NonEmptyString =
s combine "\n" combine entry(e)
def combine(a: LogEntry, b: LogEntry): NonEmptyString =
append(entry(a), b)
entries
.foldLeft[Option[(Option[LogEntry], Eval[NonEmptyString])]](None) {
case (None, a) => (a.some -> Always(a.message)).some
case (Some((Some(a), _)), b) => (none -> Always(combine(a, b))).some
case (Some((None, a)), b) => (none -> a.map(append(_, b))).some
}
.map { case (_, message) => message }
}
We’re now ready to define the Logging
algebra as follows.
trait Logging[F[_], G[_]] {
def log(entry: LogEntry): F[Unit]
def logNow(entry: LogEntry, mdc: G[MdcEntry]): F[Unit]
def dispatchLogs[A](fa: F[A], mdc: G[MdcEntry]): F[A]
def extractLogs[A](fa: F[A]): F[(A, G[LogEntry])]
}
The log
function accepts a LogEntry
and stores it in a F[Unit]
. The idea is that we compose several F[_]
in our application, and accumulate the logs from included log
expressions. At some point we’ll call dispatchLogs
, which accepts the diagnostic context, and dispatches the logs using flush
on MonadLog
. The logNow
function logs the provided LogEntry
, but immediately dispatches it with dispatchLogs
, like how a normal log expression would work. Finally, we’re including extractLogs
as it can be useful to analyse logs, e.g. in order to derive certain metrics, or for testing purposes.
In practice, you might add more useful functions to Logging
. For example, overloaded versions of the logNow
and dispatchLogs
functions which do not require you to pass an empty G[_]
whenever you don’t want a diagnostic context. We can easily add those functions by also requiring MonoidK[G]
. Other candidates include logN
and logNowN
for logging multiple entries at once.
Let’s take a look at how we could implement Logging
. The main function of interest is dispatchLogs
, which makes use of the logLevel
and logMessage
functions we wrote before. If there are no logs accumulated when dispatchLogs
gets called, we’ll simply not do anything. Assuming there are logs to dispatch, we dispatch them with the provided dispatch
function. This function will finally log the accumulated logs with a logging library.
import cats.instances.option._
object Logging {
def create[F[_], G[_]](dispatch: (Eval[NonEmptyString], G[MdcEntry], LogLevel) => F[Unit])(
implicit F: MonadLog[F, G, LogEntry],
G: Foldable[G]
): Logging[F, G] =
new Logging[F, G] {
override def log(entry: LogEntry): F[Unit] =
F.log(entry)
override def logNow(entry: LogEntry, mdc: G[MdcEntry]): F[Unit] =
dispatchLogs(log(entry), mdc)
override def dispatchLogs[A](fa: F[A], mdc: G[MdcEntry]): F[A] =
F.flush(fa) { entries =>
(logMessage(entries), mdc.some, logLevel(entries))
.mapN(dispatch)
.getOrElse(F.monad.unit)
}
override def extractLogs[A](fa: F[A]): F[(A, G[LogEntry])] =
F.extract(fa)
}
}
The IOLog Newtype
What remains to do before we can use our Logging
algebra? We need to define the dispatch
function with our logging library of choice, and we need to implement MonadLog
for our effect type. We’ll start by implementing MonadLog
for a wrapper type around IO
, since MonadLog
cannot directly be implemented for IO
. We’ll call this wrapper type IOLog
.
import cats.data.Chain
import cats.effect.IO
object effects {
import IOLog.Context
final case class IOLog[A, E](toIO: IO[Context[A, E]])
object IOLog {
final case class Context[A, E](logs: Chain[E], value: A)
}
}
There are a couple of things worth highlighting in the definition above.
IOLog
is a wrapper aroundIO
with aContext
containing the accumulated logs. We could have usedextends AnyVal
as the newtype here, and there are also alternative newtype encodings without any kind of runtime overhead.Having to wrap values with
Context
is the cost of achieving log accumulation. For simplicity, we’re using acase class
forContext
, but we could use a more compact representation, like e.g. a tuple.
Notice that IO[Context[A, E]]
looks like IO[(Chain[E], A)]
without the case class
. This happens to be the exact definition of WriterT
. This means we can define IOLog
using WriterT
instead, and we have just saved ourselves having to write the newtype above.
import cats.data.WriterT
type IOLog[A, E] = WriterT[IO, Chain[E], A]
In our case, the log entry E
in IOLog
will be LogEntry
, so we define a type alias AIO
for it.
type AIO[A] = IOLog[A, LogEntry]
AIO
can be thought of as ‘Application IO
’ or ‘All-In-One’.
Type Class Instances
To use IOLog
as our F[_]
in the application, we’ll have to rely on the type classes in Cats Effect. Luckily, Cats Effect already provides type class instances for WriterT
, so we only have to implement MonadLog
for IOLog
. We can go one step further and implement MonadLog
for WriterT
more generally.
implicit def writerTMonadLog[F[_], G[_], E](
implicit
M: Monad[WriterT[F, G[E], ?]],
F: Applicative[F],
A: Applicative[G],
K: MonoidK[G]
): MonadLog[WriterT[F, G[E], ?], G, E] =
new MonadLog[WriterT[F, G[E], ?], G, E] {
override val monad: Monad[WriterT[F, G[E], ?]] = M
override val applicative: Applicative[G] = A
override val monoidK: MonoidK[G] = K
override def log(e: E): WriterT[F, G[E], Unit] =
WriterT.tell(A.pure(e))
override def clear[A](wt: WriterT[F, G[E], A]): WriterT[F, G[E], A] =
wt.mapWritten(_ => K.empty)
override def extract[A](wt: WriterT[F, G[E], A]): WriterT[F, G[E], (A, G[E])] =
wt.mapBoth((ge, a) => (ge, (a, ge)))
}
Be sure to check the MonadLog
laws for the instance. Refer to the sources to see how it’s done.
Dispatching Messages
We’re now ready to create an instance of our Logging
algebra. In our case, we’ll make use of SL4J and Logback to define the dispatch function, but you’re free to implement it with whatever logging library, or way of dispatching logs, you would like.
import cats.effect.Sync
import org.slf4j.{LoggerFactory, MDC}
def createLogger[F[_], G[_]](
implicit F: Sync[F],
M: MonadLog[F, G, LogEntry],
G: Foldable[G]
): F[Logging[F, G]] =
F.delay(LoggerFactory.getLogger("App")).map { logger =>
Logging.create {
case (message, mdc, level) => F.suspend {
val levelEnabled = level match {
case LogLevel.Error => logger.isErrorEnabled
case LogLevel.Warning => logger.isWarnEnabled
case LogLevel.Info => logger.isInfoEnabled
case LogLevel.Debug => logger.isDebugEnabled
}
if (levelEnabled) {
val setContext =
mdc.foldLeft(F.unit) { (ms, m) =>
ms *> F.delay(MDC.put(m.key.value, m.value.value))
}
val resetContext =
mdc.foldLeft(F.unit) { (ms, m) =>
ms *> F.delay(MDC.remove(m.key.value)).void
}
F.bracket(setContext) { _ =>
F.delay(level match {
case LogLevel.Error => logger.error(message.value.value)
case LogLevel.Warning => logger.warn(message.value.value)
case LogLevel.Info => logger.info(message.value.value)
case LogLevel.Debug => logger.debug(message.value.value)
})
}(_ => resetContext)
} else F.unit
}
}
}
Putting It Together
We can now start to use our Logging
algebra, either via functions, or as arguments to other algebras. In the example below, we’ve defined a Processing
algebra which makes use of the Logging
algebra to log some details about how the request was processed.
import cats.FlatMap
import cats.syntax.flatMap._
import LogEntry._
trait Processing[F[_]] {
def processRequest(request: NonEmptyString): F[Unit]
}
object Processing {
def create[F[_], G[_]](logging: Logging[F, G])(implicit F: FlatMap[F]): Processing[F] =
new Processing[F] {
import logging._
override def processRequest(request: NonEmptyString): F[Unit] =
for {
_ <- log(ReceivedRequest(request))
_ <- log(RequestStillFailing(3))
} yield ()
}
}
Finally, we’re putting everything together inside IOApp
.
import cats.effect.{ExitCode, IOApp}
object App extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val noMdc: Chain[MdcEntry] = Chain.empty
(for {
logging <- createLogger[AIO, Chain]
_ <- logging.logNow(ApplicationStarted, noMdc)
processing = Processing.create(logging)
mdc = Chain.one(MdcEntry.TraceToken("traceToken"))
process = processing.processRequest("test")
_ <- logging.dispatchLogs(process, mdc)
} yield ExitCode.Success).value
}
}
The application outputs the following two log messages. The first one as Info
, the second as Error
.
Application started
- Received request: test
- Request still failing after 3 retries
We’ve finally accomplished log accumulation and the third logging challenge: log entries relating to a single request can be accumulated, and logged together as a single message with diagnostic context.
Testing Logging
The fourth logging challenge relates to testing: how can we avoid to always assert on messages? When log entries are modelled as values with LogEntry
s, this becomes rather easy. We’ll start by defining a test implementation of the Logging
algebra, which simply stores all dispatched messages in a Ref
, instead of dispatching log messages with a logging library.
import cats.effect.concurrent.Ref
import cats.SemigroupK
import cats.syntax.semigroupk._
object TestLogging {
def create[F[_], G[_]](logs: Ref[F, G[(NonEmptyString, G[MdcEntry], LogLevel)]])(
implicit
M: MonadLog[F, G, LogEntry],
F: Foldable[G],
S: SemigroupK[G],
A: Applicative[G]
): Logging[F, G] =
Logging.create {
case (message, mdc, level) =>
logs.modify(g => (g.combineK((message.value, mdc, level).pure[G]), ()))
}
}
We’ll then define Arbitrary
instances for IOLog
and LogEntry
, and Eq
instances for LogEntry
and MdcEntry
. Note that refined has a ScalaCheck module which provides Arbitrary
instances for many refinement types, so we don’t have to write them ourselves.
import cats.Eq
import cats.syntax.eq._
import eu.timepit.refined.cats.refTypeEq
import eu.timepit.refined.scalacheck.numeric._
import eu.timepit.refined.scalacheck.string._
import org.scalacheck.Arbitrary.arbitrary
import org.scalacheck.{Arbitrary, Gen}
trait AIOTestInstances {
implicit def ioLogArbitrary[A, E](
implicit
ArbIOA: Arbitrary[IO[A]],
ArbCE: Arbitrary[Chain[E]]
): Arbitrary[IOLog[A, E]] = Arbitrary {
for {
ioa <- arbitrary[IO[A]]
ce <- arbitrary[Chain[E]]
} yield WriterT(ioa.map((ce, _)))
}
implicit val logEntryArbitrary: Arbitrary[LogEntry] =
Arbitrary {
Gen.oneOf(
Gen.const(ApplicationStarted),
arbitrary[NonEmptyString].map(ReceivedRequest),
arbitrary[NonNegInt].map(RequestStillFailing)
)
}
implicit val logEntryEq: Eq[LogEntry] =
Eq.instance {
case (ApplicationStarted, ApplicationStarted) => true
case (ApplicationStarted, _) | (_, ApplicationStarted) => false
case (ReceivedRequest(a), ReceivedRequest(b)) => a === b
case (ReceivedRequest(_), _) | (_, ReceivedRequest(_)) => false
case (RequestStillFailing(a), RequestStillFailing(b)) => a === b
case (RequestStillFailing(_), _) | (_, RequestStillFailing(_)) => false
}
implicit val mdcEntryEq: Eq[MdcEntry] =
Eq.instance((a, b) => a.key === b.key && a.value === b.value)
}
We can then write tests for the Processing
algebra using the test Logging
implementation. Note that we can assert both on accumulated log entries and on dispatched log messages. We’re free to assert on accumulated logs by comparing LogEntry
s, instead of always having to assert on messages.
import cats.effect.ContextShift
import cats.effect.laws.util.TestContext
import cats.instances.tuple._
import org.scalatest.{Assertion, AsyncFunSpec}
final class ProcessingSpec extends AsyncFunSpec with AIOTestInstances {
test("should accumulate two log entries") {
case (_, logging, processing) =>
import logging._
import processing._
val request: NonEmptyString = "test"
val expected =
Chain.concat(
Chain.one(ReceivedRequest(request)),
Chain.one(RequestStillFailing(3))
)
extractLogs(processRequest(request))
.map { case (_, logs) => assert(logs === expected) }
}
test("should dispatch a single log message") {
case (logs, logging, processing) =>
import logging._
import processing._
val request: NonEmptyString = "test"
val mdc = Chain.one(MdcEntry.TraceToken("traceToken"))
val message =
NonEmptyString.unsafeFrom {
show"""
|- Received request: $request
|- Request still failing after 3 retries
""".stripMargin.trim
}
val expected = Chain.one((message, mdc, LogLevel.Error))
for {
_ <- dispatchLogs(processRequest(request), mdc)
dispatchedLogs <- logs.get
} yield assert(dispatchedLogs === expected)
}
type Dispatched = Chain[(NonEmptyString, Chain[MdcEntry], LogLevel)]
type TestInput = (Ref[AIO, Dispatched], Logging[AIO, Chain], Processing[AIO])
def test(name: String)(f: TestInput => AIO[Assertion]): Unit = {
implicit val contextShift: ContextShift[IO] =
IO.contextShift(TestContext())
it(name) {
(for {
logs <- Ref.of[AIO, Dispatched](Chain.empty)
logging = TestLogging.create(logs)
processing = Processing.create(logging)
assertion <- f((logs, logging, processing))
} yield assertion).value.unsafeToFuture()
}
}
}
Limitations
There is one important limitation to the outlined logging approach.
- When an error occurs, any accumulated log entries are lost. This is a consequence of keeping logs as values. If that’s unacceptable for some logs, then resort to
logNow
for those entries. This means you might have to piece together the log entries later using an identifier in the diagnostic context.
There is also one technical limitation worth knowing about.
- Logs captured in
asyncF
, andrelease
ofbracketCase
(andbracket
) will be discarded. (#371)
Sources
The code shown throughout this post is also available on GitHub.