Pawan Mishra
by Pawan Mishra
4 min read

Tags

Table of Contents

  1. Unhandled Message
  2. DeadLetter Message

In the previous post, we saw Actor based code in action. At the core of actor based programming lies the fact that all of the communication between actors happens via sharing of immutables messages. We can think of these messages as dialects of actors and an actor will only respond to messages it understands. In this post, we will look into following special cases of actors & messages :

  • Invalid message : actor receives message it’s not capable of handling aka UnhandledMessage
  • Late message : actor receives message after it has died aka DeadLetter

Unhandled Message


I wrote the following code snippet for illustrating our first use-case.

case object Message
case object InValidMessage

object ActorsInAction {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("ActorSystem")
    val driverActor = system.actorOf(Props[MainActor], "mainActor")

    driverActor ! Message
    driverActor ! InValidMessage
    driverActor ! Message
  }
}

class MainActor extends Actor {
  def receive = {
    case Message => {
      println("Message Received")
    }
  }
}
  • Case classes(Message & InValidMessage) represent our messages
  • driverActor is ActorRef of our MainActor actor class which can handle messages of type Message
  • In main method, in between two valid Message, we have passed an invalid message(InValidMessage) to our MainActor

Running the app prints the following output:

Message Received
Message Received

Nothing surprising in the output. We know InValidMessage was not handled but question is what happened to the message itself? Was it even delivered to the MainActor? Answer is yes. Message was indeed delivered to MainActor. But to better understand what happens to message after Actor receives it, we will have to look into Actor code itself.

If you attach debugger & debug inside Actor code, then call to method receive will end up at :

/**
   * INTERNAL API.
   *
   * Can be overridden to intercept calls to this actor's current behavior.
   *
   * @param receive current behavior.
   * @param msg current message.
   */
  protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = {
    // optimization: avoid allocation of lambda
    if (receive.applyOrElse(msg, Actor.notHandledFun).asInstanceOf[AnyRef] eq Actor.NotHandled) {
      unhandled(msg)
    }
  }

Without going into too much details, we can see from the above if condition that if receive method is not capable of handling incoming message then it returns Actor.notHandledFun thus causing if condition to evaluate to true. unhandled method implementation is quiet straightforward :

/**
   * User overridable callback.
   * <p/>
   * Is called when a message isn't handled by the current behavior of the actor
   * by default it fails with either a [[akka.actor.DeathPactException]] (in
   * case of an unhandled [[akka.actor.Terminated]] message) or publishes an [[akka.actor.UnhandledMessage]]
   * to the actor's system's [[akka.event.EventStream]]
   */
  def unhandled(message: Any): Unit = {
    message match {
      case Terminated(dead)  throw new DeathPactException(dead)
      case _                 context.system.eventStream.publish(UnhandledMessage(message, sender(), self))
    }
  }

Inside unhandled method, since the incoming message is not of type Terminated, our InValidMessage is published onto the eventStream in form of UnhandledMessage. Can we handle UnhandledMessage? Yes we can. In the below code, we have created another actor meant specifically for handling UnhandledMessage and attached it as listener to the eventStream.

case object Message
case object InValidMessage

object ActorsInAction {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("ActorSystem")
    val driverActor = system.actorOf(Props[MainActor], "mainActor")
    val listener = system.actorOf(Props[DeadActorListener], "deadActor")
    system.eventStream.subscribe(listener, classOf[UnhandledMessage])

    driverActor ! Message
    driverActor ! InValidMessage
    driverActor ! Message
  }
}

class MainActor extends Actor {
  def receive = {
    case Message => {
      println("Message Received")
    }
  }
}

class DeadActorListener extends Actor {
  def receive = {
    case u: UnhandledMessage => println("Unhandled message " + u.message)
  }
}

Output :

Message Received
Message Received
Unhandled message InValidMessage

DeadLetter Message


Lets extend our already running code snippet to highlight the scenario of late message i.e. message arriving while actor is shutting down or already dead.

case object Message
case object InValidMessage

object ActorsInAction {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("ActorSystem")
    val driverActor = system.actorOf(Props[MainActor], "mainActor")
    val listener = system.actorOf(Props[DeadActorListener], "deadActor")
    system.eventStream.subscribe(listener, classOf[UnhandledMessage])

    driverActor ! Message
    driverActor ! InValidMessage
    driverActor ! Message
    driverActor ! PoisonPill
    driverActor ! Message
  }
}

class MainActor extends Actor {
  def receive = {
    case Message => {
      println("Message Received")
    }
  }
}

class DeadActorListener extends Actor {
  def receive = {
    case u: UnhandledMessage => println("Unhandled message " + u.message)
  }
}

You kill an actor either by sending PoisonPill or by calling system.stop(driverActor). PoisonPill is a special kind of message which is automatically handled by Akka framework. Other special messages include : Kill, Terminated, Identify, ActorSelection etc. Following code from Actor.scala class handles these special message types :

def autoReceiveMessage(msg: Envelope): Unit = {
    if (system.settings.DebugAutoReceive)
      publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))

    msg.message match {
      case t: Terminated               receivedTerminated(t)
      case AddressTerminated(address)  addressTerminated(address)
      case Kill                        throw new ActorKilledException("Kill")
      case PoisonPill                  self.stop()
      case sel: ActorSelectionMessage  receiveSelection(sel)
      case Identify(messageId)         sender() ! ActorIdentity(messageId, Some(self))
    }
  }

Calling stop() causes the actor to shutdown itself & stop the message queue. Any message arriving in MailBox of an actor after its marked dead, is treated as special case of what is called DeadLetter. In our code, we can extend our listener actor to also listen for DeadLetter messages.

case object Message
case object InValidMessage

object ActorsInAction {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("ActorSystem")
    val driverActor = system.actorOf(Props[MainActor], "mainActor")
    val listener = system.actorOf(Props[DeadActorListener], "deadActor")
    system.eventStream.subscribe(listener, classOf[UnhandledMessage])
    system.eventStream.subscribe(listener, classOf[DeadLetter])

    driverActor ! Message
    driverActor ! InValidMessage
    driverActor ! Message
    driverActor ! PoisonPill
    driverActor ! Message
  }
}

class MainActor extends Actor {
  def receive = {
    case Message => {
      println("Message Received")
    }
  }
}

class DeadActorListener extends Actor {
  def receive = {
    case u: UnhandledMessage => println("Unhandled message " + u.message)
    case d: DeadLetter => println("dead message " + d.message)
  }
}

Output :

Message Received
Message Received
Unhandled message InValidMessage
dead message Message