Blending Cucumber, Cassandra and Akka-Http

Knoldus

Folks,

Knoldus has always pioneered the deep diving into the best ways to use cutting edge technologies. In the past few days, one of our team carried this deed by integrating Cucumber with Akka-Http, Cassandra and of course, Scala. In this blog, we reach out to you to explain and show how this can be done.

Cucumber

Cucumber is for Behavior Driven Design (BDD). The approach of Cucumber is to write the behavior of the application and then run them for acceptance testing.

Akka-Http

Akka-Http is a general toolkit provided by Akka to implement HTTP services. It supports both client and server side services.

Cassandra

Cassandra is a database that provides high scalability and availability with best performance.

View original post 197 more words

Building Apache Spark with Scala 2.10

As we know, Spark is a Map-Reduce like cluster computing framework, designed to make data analytics fast.

In the official website of Spark, latest release is Spark 0.7.3. But this release requires Scala 2.9.3. If you are using Scala 2.10, this release would not work.

Since Spark has not announced any Scala 2.10 compatible release yet, so to build Spark with Scala 2.10, you have to download latest release from here: Spark-Scala 2.10.

Installation instructions would be the same as previous release.

Since Spark Api for this release is not available on any repository, so if you want to use it in your project, you need to do:

1) Go to Spark directory

2) Run sbt compile publish-local

3) Add
libraryDependencies += "org.spark-project" %% "spark-core" % "0.8.0-SNAPSHOT"
in the build.sbt of your Scala project.

Bridging the Communication between Lift Comet Actor and Akka Actor in Scala

In this blog, I would explain how to create bridge between Lift Comet Actor and Akka Actor.
Since Lift has its own actor model, so a bridge is needed to fill communication gap between LiftComet and Akka Actor, so that Akka actor would be able send message to comet, once comet has been set.

This bridging concept would be more useful for those web applications, which are built in Lift and using Akka Actor for message passing and running in distributed environment.

I have created a small Lift Login application with Things to Do and Chat functionality. you can find full source code here on GitHub:

You will find bridge concept in Chat Module.

class BridgeActor extends Actor {
  private var target: Option[CometActor] = None
  def receive = {
    case comet: CometActor => target = Some(comet)
    case msg => target.foreach(_ ! msg)
  }
}

/**
 * Controls the life-cycle of Actor Bridges
 */
object BridgeController extends Loggable {

  def getBridgeActor: ActorRef = {
    GlobalActorSystem.getActorSystem.actorOf(akka.actor.Props[BridgeActor])
  }
}

Here is one BridgeController, which would provide a BridgeActor on the request of Chat comet. Comet would get registered in forwarding target.
Akka actor would send message to this forwarding target.

Concept would be more clear from below lines:-

In Chat.scala:-

/*Requesting Bridge Controller to provide BridgeActor*/
private lazy val bridge: ActorRef = BridgeController.getBridgeActor

  override def localSetup {
    bridge ! this
    val future = akka.pattern.ask(alertManager, Subscribe(bridge, User.currentUser.open_!.id.is))
    val result = Await.result(future, timeout.duration).asInstanceOf[List[(ObjectId, String)]]
    chats = result
    super.localSetup
}


 override def localShutdown {
    alertManager ! Unsubscribe(bridge)
    bridge ! akka.actor.PoisonPill
    super.localShutdown
  }

We need to instantiate bridge at the time of setting up comet. This would register comet in forwarding target of BridgeActor.
We also need to kill bridge actor at the time of shutting down Comet. We can do this by giving PoisonPill.

This is a working example and would be more useful for remote application.

Please provide your valuable feedback.

A Simple Remote Chat application using Lift Comet and Akka Actor in Scala

If you are running an application and there are lot of chances for a huge traffic to come in future, a single server would not be capable to handle this.
In that case you would have two choices:
A) Replace existing server with an advance server with better configuration
B) Building a global node in the cluster to be used by other nodes.

We know that first option is not a permanent solution. So lets move to second option.

I have created a Login Application with Things To Do functionality using Lift in Scala.
In this application, I have added a chat module using LiftComet Actor and AkkaActor.

In the Chat Module, I have implemented cluster functionality, so that application could be accessed remotely.
So the concept is: there would be one Akka Actor on central server and this central akka actor would be used by other comet actors, whether they are on the same server or different server.

Since Lift has its own actor model, so I have created a bridge between LiftComet and Akka Actor, so that Akka actor would be able send message to comet, once comet has been set.

To learn bridging concept in deep, go here : http://riteofcoding.blogspot.in/2011/05/beyond-chat-lift-comet-and-akka.html

In this application, there is one controller

object CometAlertController extends Loggable {

This controller would give central actor reference, if exists, otherwise create a new central actor, if not exist. Here I have created a central actor with hard-coded configuration,
You can change this according to your application.

  private def getOrCreateActor: ActorRef = {
    try {
      val actorRef = GlobalActorSystem.getActorSystem.actorFor("akka://Node@" + hostAddress + ":3557" + "/user/Manager")
      val future = akka.pattern.ask(actorRef, Ping())
      // Use Await.result, if blocking is necessary
      // Use mapTo for non-blocking scenario
      val result = Await.result(future, timeout.duration).asInstanceOf[Ping]
      actorRef
    } catch {
      case e: TimeoutException =>
        createActor
    }
  }

As you can see in the above method, first controller will ping to central actor by sending a Ping message.
If actor exists, it would send a reply otherwise a TimeoutException would be thrown. If TimeoutException is caught in catch block, controller would create a new central actor.


  private def createActor = {
    val remoteSystem = ActorSystem("Node", ConfigFactory.load(configuration))
    val result = remoteSystem.actorOf(Props[AkkaAlertActor], "Manager")
    result
  }


  def getManager: ActorRef = getOrCreateActor

There is GlobalActorSystem.scala to create single actor system throughout the application.

For the chat module, there is Chat.scala, which is comet actor and AkkaAlertActor is Akka Actor.
The comet actor and akka actor would communicate with each other using BridgeActor, which works as a bridge between comet and akka.

Since In a remote application, actors would pass message throughout the network, so those messages must be serialized. To serialized those messages, I have implemented Akka-Kryo-Serialization.

How to run application:-

A) Clone code from https://github.com/romix/akka-kryo-serialization.

B) Run sbt compile publish-local

C) Now clone https://github.com/knoldus/Lift_UserLogin_Template

D) Run sbt ~container:start

How to test in remote environment:

A) Open a terminal and run sbt ~container:start.

B) Open build.sbt and change

port in container.Configuration := 8081

C) Open another terminal and run sbt ~container:start.

D) Register yourself and login and Enjoy chat.

You would be able to do chat remotely. Check and let me know your feedback. Your feedback would be highly appreciated.

Easiest Way to Recover Akka Actor for Remote Application JVM Failure Scenario in Scala

In this blog, I would explain how easily you can recover Akka actor for Remote Application JVM failure scenario in Scala.

I have implemented this in one of my project and it is working fine.

Suppose there is a remote application, which is being used by multi JVMs, and JVMs are passing message to each other through Akka actor, then
there must be an Akka actor recovery mechanism to prevent any application crash.

For ex:
There are three JVMs, accessing a remote application. All of them are using same Akka actor reference to pass message to each other.
That Actor could be created by any JVM. Other JVM would access that actor by its actor path.

If JVM1, who is the creator of actor, shuts down for any reason then application would be crashed because other JVMs are using same actor unless we don’t have any Akka actor recovery mechanism.

I am giving here just a scenario. There are many other scenarios as well for crashing a remote application.

Now let’s come back to scenario which I have explained above:

We have one controller, which creates an actor on the request of any JVM, if actor does not exist, or gives reference of existing actor on the request of any JVM.

def getOrCreateActor(): ActorRef={
// Write here logic for 
// creating an actor, if actor does not exist
// Or
// giving reference of actor, if actor exists. 

}
/**
* Retry support for Akka Actor
*/
def retry(func: => ActorRef, message: Any, maxAttempts: Int, attempt: Int): Option[Any] = {
    try {
      val future = akka.pattern.ask(func, message)
       // Use Await.result, if blocking is necessary
       // Use mapTo for non-blocking scenario
      val result = Await.result(future, timeout.duration) 
      Some(result)
    } catch {
      case e: TimeoutException =>
        if (attempt <= maxAttempts) ask(func, message, maxAttempts, attempt + 1)
        else None
    }
  }

When Akka future fails to respond, this retry logic would start automatically and would work until result is obtained or until number of retry limit is finished.
If we would not provide any limit, this would work indefinitely until result is obtained.

A sample call to this method :-

retry(getOrCreateActor, message, 10, 0)

As you can see, in the above “ask” method, we are passing a function, to create Akka actor, as first parameter instead of passing particular actor.

The reason is:

If an actor dies for any reason, this logic would still work. It would create a new actor to prevent the application being crashed and try to receive result from newly created actor.

Note:- We should always avoid Await.result unless we have any blocking scenario. We could use mapTo for non-blocking scenario.

In my case I am using Await.result. Because there are some chances for failed JVM to get repaired after some time, so that we might not need to create new actor.

If you have tried any better approach in terms of coding and performance, please suggest here.

I would try to implement the same.

Your suggestion would be highly appreciated.

A simple way to implement Retry Support For Akka Future in Scala

In this blog, I have explained a simple way to implement retry support for Akka future in Scala.
When Akka actor dies for any reason and we try to receive response from Akka future, this implementation would work.

Here is the logic:

object RetrySupport  {

  def ask(func: => ActorRef, message: Any, maxAttempts: Int, attempt: Int): Option[Any] = {
    try {
      val future = akka.pattern.ask(func, message)
      val result = Await.result(future, timeout.duration)
      Some(result)
    } catch {
      case e: TimeoutException =>
        if (attempt <= maxAttempts) ask(func, message, maxAttempts, attempt + 1)
        None
    }
  }
}

When Akka future fails to respond, this retry logic would start automatically and would work until result is obtained or until number of retry limit is finished.
If we would not provide any limit, this would work indefinitely until result is obtained.

As you can see, in the above “ask” method, we are passing a function, to create Akka actor, as first parameter instead of passing particular actor.

The reason is:

If an actor dies for any reason, this logic would still work. It would create a new actor and try to receive result from newly created actor.

If you have implemented any better approach, please suggest.

Any suggestion would be appreciated.