Spark integration

Introduction

While HFactory Tools does not offer Spark integration out of the box, the HTalk DSL and the strongly-typed HFactory entities can be used within Spark computations.

Packaging

In order to make entities and HTalk available for use in Spark RDDs, the jars containing them must be sent to the Spark cluster workers.

Having your app’s entities and entity registry in a separate datamodel subproject helps reduce the clutter as the subproject’s assembly jar (if your app is called myapp, that would be myapp-datamodel) will only bring in the required dependencies. For instance, there is no need to bring the app’s REST interface to the worker nodes.

Similarly, if you intend to stream messages and not only actual data points to Spark, you should either define these messages within the datamodel subproject or in another separate subproject in order to keep dependencies to a minimum – you would then have to send the jar of that other subproject to the workers, obviously.

Actor-based data streaming

Spark offers several ways to “inject” data into a stream. Since HFactory uses Akka, the easiest way to stream data from an HFactory app is to use Spark’s ActorHelper: mixing this trait in an actor gives it access to the Spark streaming API. Then it is only a matter of spawning that actor using Spark’s StreamingContext.actorStream(). Please refer to the Spark documentation for more information.

NOTE: Spark may spawn the “injection actor” on any node of its choosing and unfortunately does not tell what which node it is! So you will have to make the actor publish its location somehow. You could make the injection actor send a message to another actor whose address is well-known (e.g. in your app, running on the node hosting the HFactory server).

Entity serialization

To make the entity registry and the entities it contains accessible to RDDs, they must be sent onto the worker nodes and thus must be serializable.

The easiest way to do this is to make a singleton instance of the registry (note that the registry itself is necessarily a class, as required by HFactory apps), like so:

          object MyAppRegistry extends MyAppRegistry

Making the entities accessible from an RDD is then simply a matter of importing that object in the scope of the RDD:

          import MyAppRegistry._

Submitting a Spark job

The myapp-datamodel jar must be provided on the commandline of the spark-submit command, as well as any other jar required by your RDD. Most notably, since an app’s jars does not (and should not!) include the HFactory classes, the hfactory-core jar must be provided on the commandline.

However, since these jars may depend on other jars, it would be painful to determine the set of jars “by hand”. A way to avoid this is to define a Spark-specific subproject (myapp-spark, say) in your app where the top-level dependencies myapp-datamodel and hfactory-core appear: building the assembly of that subproject will then automatically include all the transitive dependencies required.

            spark-submit --master yarn-client
              --conf akka.actor.provider=akka.remote.RemoteActorRefProvider
              --conf spark.myapp.inputFormat=CSV
              --conf spark.myapp.inputFile=test.csv
              --class com.example.myapp.MyApp
              myapp-spark-assembly-1.0.0.jar
  • akka.actor.provider=akka.remote.RemoteActorRefProvider makes the Akka actor system within Spark able to communicate with other actor systems. This is required between the app (running on the HFactory server) and the RDD (running on the Spark workers).

  • Spark only passes configuration settings that start with spark. and silently drops all other settings. Hence if your RDD requires some configuration to be passed, a simple trick is to have its configuration settings start with spark.myapp., where myapp is the name of your application (provided it doesn’t conflict with standard Spark settings). In our example, our app uses settings spark.myapp.inputFormat and spark.myapp.inputFile.

For a detailed description of spark-submit and its options (number of executors and cores, memory settings, etc), please refer to the Spark documentation.