'Handling Events from a Service running in a Task in an AKKA.NET actor

I'm using the Prism framework for my WPF application. I have a producer service that runs in a Task and raises a CompositePresentationEvent when a file is found. I have an Akka actor that subscribes to the event. The actor's handler looks is simple, it looks like this:

private void HandleFileReceive(FileEventArgs args)
{
    Self.Tell(new FileReceived(args.File));
}

When the event hits the handler above, I receive a System.NotSupportedException with this message: "There is no active ActorContext, this is most likely due to use of async operations from within this actor".

I assume this is because the service is running in a different thread than the actor's handler. Is there a way to handle this type of thing in Akka.NET?

I'm not completely opposed to writing new Actors that do the job of the Services that are needed for my situations. The issue is that, depending on some settings in a file, the service will be different. Currently, I'm handling this using MEF and getting the correct implementer of my given interface from the IoC container. I'd like to continue to keep the concrete implementations of the producers abstracted from the core code (where the actors are).

Any suggestions on getting passed this (perceived) threading issue and/or dynamically generating a ProducerActor that implements a given interface?

Thanks

-g



Solution 1:[1]

I ended up creating an actor to handle the response retrieval and changing the producer interface so the concrete producers only need to implement a single, simple method. The old producers were in charge of waiting a config-driven amount of time before checking for responses, but the new code utilizes AKKA's scheduler to tell the retriever, at the same config-driven interval, when to check for responses. I've provided a sample below, but my code is a little more complex with error handling and such.

In the code below, you see the interface for IProducer and the concrete implementations: LocalProducer and EmailProducer. In the real code, these have attributes that tell the container some other information that is used to get the correct implementation from the container. The ConsumerActor is the parent actor for this scenario and handles the Consume message. It creates a ResponseRetrieverActor in its constructor and schedules a repeated tell for the ResponseRetrieverActor so that the retriever checks for responses at a given interval.

In the ResponseRetrieverActor's RetrieveResponses handler is where the IoC magic happens. I get the transport type from a factory method (not shown here -- it is actually set in a config file of sorts). The transport type is used to retrieve the correct producer from the IoC container using the aforementioned attributes on the concrete Producers. Finally, the producer is used to get the responses and tells the parent, which is the consumer, about each file in the list.

public interface IProducer
{
   List<string> GetResponses();
}

[Export(typeof(IProducer))] // Other attributes needed for MEF Export to differentiate the multiple IProducer implementations
public LocalProducer : IProducer
{
   public List<string> GetResponse()
   {
       // get files from a directory
   }
}

[Export(typeof(IProducer))]
public EmailProducer : IProducer
{
   public List<string> GetResponse()
   {
       // get files from email account
   }
}

public class ConsumerActor
{
    public class Consume
    {
       public Consume(string file) { this.File = file; }

       public string File { get; set; }
    }

    public ConsumerActor() 
    {
       _retriever = Context.ActorOf(Props.Create<ResponseRetrieverActor>(), "retriever");

       var interval = 10000;
       Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(0, interval, _retriever, new ResponseRetrieverActor.RetrieveResponses(), Self);
       Start(); 
    }

    private void Start()
    {
        Receive<Consume>(msg => 
        {
           // do something with the msg.File 
        });
    }

    private IActorRef _retriever;
}
public class ResponseRetrieverActor
{
  public class RetrieveResponses { }

  public ConsumerActor() { Start(); }

  private void Start()
  {
    Receive<RetrieveResponses>(msg => HandleRetrieveResponses());  
  }

  private void HandleRetrieveResponses()
  {
     var transportType = TransportFactory.GetTransportType(); // Gets transport protocol for the producer we need to use (Email, File, ect.)
     var producer = ServiceLocator.Current.GetInstance<IProducer>(transportType); // Gets a producer from the IoC container for the specified transportType

     var responses = producer.GetResponses();

     foreach(var response in responseFiles)
     {
         Context.Parent.Tell(new ConsumerActor.Consume(response));
     }
  }
}

Solution 2:[2]

I had a similar issue where I wanted to run a process to run a command line app from an actor. The problem is that I wanted to get the output, which is done by handling the process.OutputDataReceived.

What I ended up doing is using a custom stack to put the messages from the handler process.OutputDataReceived += (sender, e) => Output.Push(e.Data);

and the custom stack looks like

class OutputStack<T> : Stack<T>
{
   public event EventHandler OnAdd;
   public void Push(T item)
   {
       base.Push(item);
       if (null != OnAdd) OnAdd(this, null);
   }
} 

And then in the constructor I just handle the OnAdd of my custom stack

OutputStack<string> Output = new OutputStack<string>();
Output.OnAdd += (sender, e) =>
{
    if (Output.Count > 0)
    {
        var message = Output.Pop();
        actor.Tell(new LogActor.LogMessage(message));
    }
};

It is a little hacky but it works and I get to send the message as soon as it happens (and gets handled). Hopefully I will get to fix it in the future...

Solution 3:[3]

Not sure wether this solves the OPs problem, but when I searched for "There is no active ActorContext, this is most likely due to use of async" I got to here, so I'll put the easy solution for my problem here:

If you await some async call in some handler, that handler has to be async. If an async handler is not used with ReceiveAsync<> instead of Receive<>, the above exception occurs. ReceiveAsync<> suspends the mailbox of the actor while the async handler is not finished.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Glen Skinner
Solution 2 Vasilis Sakellarios
Solution 3 Batox