Enabling Site-Aware Scheduling for Apache Storm in ExoGENI

Overview

Apache Storm is a free and open source distributed real-time computation system, designed for processing streaming data. It allows users to easily define and execute topologies. Storm is task parallel and follows the master/slave paradigm. For more details about Storm, please see Storm’s documentation.

This post will introduce how to enable site-aware scheduling for Storm in ExoGENI in order to take advantage of worldwide distributed sites in ExoGENI. We will walk you through an example running on an multi-site ExoGENI slice to give you an insight into the approach.

Storm Basics

In Storm, users execute topologies on a Storm cluster. A topology consists of components called spouts and bolts as shown in Figure 1. A spout acts as a data source and a bolt is a computation process. Spouts and bolts are connected with built-in or custom grouping schemas. Serialized data, referred to as tuples, are transferred between spouts and bolts.

Figure 1: Storm Topology

Figure 1: Storm Topology

Storm cluster provisions computing resources for executing submitted topologies. Nimbus is the master node that accepts topologies submissions from users and dispatches tasks to the slave nodes. Supervisors are slave nodes making progress on data processing. Storm employs Apache ZooKeeper for fault recovery and coordination between Nimbus and supervisors. The supervisor’s computing resource can be partitioned into multiple Worker Slots. In each Worker Slot, Storm can spawn multiple threads, referred to as Executors, which fulfill tasks assigned by Nimbus.

Figure 2: Storm cluster

Figure 2: Storm cluster

Problem

In Storm, Nimbus uses a scheduler to assign tasks to the Supervisors. The default scheduler aims at allocating computing resources evenly to topologies. It works well in terms of fairness among topologies, but it is impossible for users to predict the placement of topology components in the Storm cluster. However, in a deployment over multiple sites, it can be very important to allow users to assign a particular topology component to a supervisor locating at a specific site. Therefore, we explore an approach to address this problem.

Pluggable Scheduler

Since 0.8.0 release, Storm allows users to plug in custom scheduler in order to apply custom scheduling policy. The custom scheduler is required to implement the IScheduler interface, which contains two methods – prepare(Map conf) and schedule(Topologies topologies, Cluster cluster). The custom scheduling policy is implemented in the schedule(Topologies topologies, Cluster cluster) method. On the other hand, Storm preserves a field in the supervisor’s configuration, named as storm.scheduler.meta, in which users can specify any scheduling metadata in “key-value” pairs to facilitate scheduling. The metadata can be acquired by calling getSchedulerMeta() method. Now let’s go through the steps in detail.

ExoGENI Slice

An ExoGENI slice can describe a number of VMs deployed over multiple geographical sites and connected with user-defined network links. It facilitates the development of site-aware scheduling mechanism for Storm and more sophisticated scheduling algorithms based on site information. In this post, we present the example on an ExoGENI slice over TAMU, UH and FIU sites. Let’s set up the slice first.

VM Image

A VM image based on CentOS 6.3 has been created for the example. The image has all dependencies for Storm deployment installed, including OpenJDK, Storm, ZooKeeper, ZeroMQ and jzmq. The image information is listed below. you need to append this information to your .flukes.properties to make the image visible in Flukes.

image1.name: Storm-SiteAware-v0.2
image1.url: http://geni-images.renci.org/images/dcvan/storm/Storm-SiteAware-v0.2/Storm-SiteAware-v0.2.xml
image1.hash: 047baee53ecb3455b8c527f064194cad9a67771a

Slice and Post-Bootscript

We also created an example slice request. The topology of the slice is shown in Figure 3.

Figure 3: Slice topology

Figure 3: Slice topology

The slice is composed of a Nimbus node, a ZooKeeper node and 3 supervisor nodes. Each node has the post-bootscript that takes care of the setup automatically. There is a snippet of script in each supervisor node that detects which site the node is running on using neuca-user-data command, as shown below. The site information will then be appended to the supervisor’s configuration used for scheduling later. Other parts of the post-bootscript are included in the slice request.

site=$(/usr/local/bin/neuca-user-data |grep -m1 physical_host|awk -F'[=-]' '{print $2}')
while [ ! "$site" ];do
  site=$(/usr/local/bin/neuca-user-data |grep -m1 physical_host|awk -F'[=-]' '{print $2}')
  sleep 1
done

The example slice has a working solution installed and is supposed to serve as a reference. Now let’s walk through the idea step by step.

Solution

Now we have a Storm cluster with 3 supervisors deployed at TAMU, UH and UFL in ExoGENI, respectively. Assume we also have a topology with 1 spout and 3 bolts. We want the topology components to be scheduled as shown in Figure 4 after submission.

Figure 4: Use case

Figure 4: Use case

Step 1: Tag Supervisors

As mentioned above, Storm offers a field in the supervisor’s configuration for users to specify custom scheduling metadata. In this case, we tag the supervisors with the name of sites they are running at. The configuration file is located in $STORM_HOME/conf/storm.yaml and you need to append the following lines to it.

...
supervisor.scheduler.meta:
    site: "tamu"
...

Remember to restart Storm daemon to update the configuration. If you are using the example slice, the Storm daemon is monitored with Supervisor. So you simply need to restart supervisord by typing the following line.

# service supervisord restart

Step 2: Tag Topology Components

This step is done when building the topology with TopologyBuilder in the main method of a topology. The ComponentConfigurationDeclarer has a method called addConfiguration(String config, String value) that allows adding custom configuration, i.e. metadata. In our case, we add the site information using this method.

public class ExampleTopology{
    public static void main(String[] args){
        ...
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout_1", new ExampleSpout1(), 1).addConfiguration("site", "tamu");
        builder.setBolt("bolt_1", new ExampleBolt1(), 1).shuffleGrouping("spout_1").addConfiguration("site", "tamu");
        builder.setBolt("bolt_2", new ExampleBolt2(), 1).shuffleGrouping("spout_1").addConfiguration("site", "uh");
        builder.setBolt("bolt_3", new ExampleBolt3(), 1).globalGrouping("bolt_1").globalGrouping("bolt_2").addConfiguration("site", "ufl");
        StormSubmitter.submitTopology("example-topology", builder.createTopology());
        ...
    }
}

Step 3: Customize Scheduler

This step is where the magic happens. As we already have the site information ready on both the Storm cluster and the topology, we need to match them up now. As mentioned above, the scheduling logic is implemented in the schedule(Topologies topologies, Cluster cluster) method. The Topologies is a wrap-up of all the topologies submitted to the cluster. It has a collection of TopologyDetails instances, each of which contains information about the corresponding topology. The Cluster represents the Storm cluster, which has a getSupervisors() method to get a list of SupervisorDetails instances that contain details about the supervisors. Then we get the metadata using getSchedulerMeta() method as referred to above. Till now, the code should look as below.

public class SiteAwareScheduler implements IScheduler{

    private static final String SITE = "site";

    @Override
    public void prepare(Map conf) {}

    @Override
    public void schedule(Topologies topologies, Cluster cluster) {
	Collection<TopologyDetails> topologyDetails = topologies.getTopologies();
	Collection<SupervisorDetails> supervisorDetails = cluster.getSupervisors().values();
	Map<String, SupervisorDetails> supervisors = new HashMap<String, SupervisorDetails>();
	for(SupervisorDetails s : supervisorDetails){
	    Map<String, String> metadata = (Map<String, String>)s.getSchedulerMeta();
	    if(metadata.get(SITE) != null){	
		 supervisors.put((String)metadata.get(SITE), s);
	    }
	}
    }
}

Note that the supervisors are indexed by their sites in a hash map in order to accelerate supervisor lookup. Without caching the supervisors, we need to iterate over all the supervisors to find the particular supervisor for a component, resulting in O(number of components in a topology * number of supervisors in cluster) complexity. Because supervisors are relatively static compared to topology components, we cache the supervisors and lower the complexity to O(number of components in a topology) in average. The cache will be updated periodically as the schedule(Topologies topologies, Cluster cluster) method will be called by Storm’s core time to time and changes on supervisors will be reflected in time.

Then we need to iterate over the topologies to schedule them properly. Before looking for topology components, we check if a topology needs to be scheduled with needsScheduling(TopologyDetails topology). If a topology has not been scheduled yet, we move forward; otherwise, we simply skip to the next one. Details about the components are encapsulated in the StormTopology instance, which can be derived by calling getTopology() method of each TopologyDetails instance. In the StormTopology instance, we can get SpoutSpec and Bolt instances which are spouts and bolts in the topology. The site metadata is stored in a JSON string returned by get_json_conf() method. We need a JSON parser to parse the string in order to get the metadata. Here we use json-simple.

public class SiteAwareScheduler implements IScheduler{

    private static final String SITE = "site";

    @Override
    public void prepare(Map conf) {}

    @Override
    public void schedule(Topologies topologies, Cluster cluster) {
	Collection<TopologyDetails> topologyDetails = topologies.getTopologies();
	Collection<SupervisorDetails> supervisorDetails = cluster.getSupervisors().values();
	Map<String, SupervisorDetails> supervisors = new HashMap<String, SupervisorDetails>();
	JSONParser parser = new JSONParser();
	for(SupervisorDetails s : supervisorDetails){
	    Map<String, String> metadata = (Map<String, String>)s.getSchedulerMeta();
	    if(metadata.get(SITE) != null){	
		 supervisors.put((String)metadata.get(SITE), s);
	    }
	}
		
	for(TopologyDetails t : topologyDetails){
	    if(!cluster.needsScheduling(t)) continue;
	    StormTopology topology = t.getTopology();
	    Map<String, Bolt> bolts = topology.get_bolts();
	    Map<String, SpoutSpec> spouts = topology.get_spouts();
	    try{
		for(String name : bolts.keySet()){
		    Bolt bolt = bolts.get(name);
		    JSONObject conf = (JSONObject)parser.parse(bolt.get_common().get_json_conf());
		    if(conf.get(SITE) != null && supervisors.get(conf.get(SITE)) != null){
			String site = (String)conf.get(SITE);
		    }
		}
	        for(String name : spouts.keySet()){
		    SpoutSpec spout = spouts.get(name);
		    JSONObject conf = (JSONObject)parser.parse(spout.get_common().get_json_conf());
		    if(conf.get(SITE) != null && supervisors.get(conf.get(SITE)) != null){
			String site = (String)conf.get(SITE);
		    }
		}
	    }catch(ParseException pe){
		pe.printStackTrace();
	    }
	}
    }
}

Now we have site metadata for each topology component, we can retrieve corresponding supervisors from the cache using the metadata. As introduced above, a supervisor consists of a number of worker slots and a component usually occupies one of them rather than the entire supervisor. The available worker slots can be acquired by calling getAvailableSlots(SupervisorDetails supervisor). On the other hand, a component may be composed of multiple executors, which will be returned by getNeedsSchedulingComponentToExecutor(TopologyDetails topology) and can be distributed over different worker slots. For simplicity, we schedule all executors of a component to the first available worker slot in this tutorial. Finally, assign(WorkerSlot slot, String topologyId, Collection executors) will assign the component to the correct worker slot.

public class SiteAwareScheduler implements IScheduler{

    private static final String SITE = "site";

    @Override
    public void prepare(Map conf) {}

    @Override
    public void schedule(Topologies topologies, Cluster cluster) {
	Collection<TopologyDetails> topologyDetails = topologies.getTopologies();
	Collection<SupervisorDetails> supervisorDetails = cluster.getSupervisors().values();
	Map<String, SupervisorDetails> supervisors = new HashMap<String, SupervisorDetails>();
	JSONParser parser = new JSONParser();
	for(SupervisorDetails s : supervisorDetails){
	    Map<String, String> metadata = (Map<String, String>)s.getSchedulerMeta();
	    if(metadata.get(SITE) != null){	
		 supervisors.put((String)metadata.get(SITE), s);
	    }
	}
		
	for(TopologyDetails t : topologyDetails){
	    if(cluster.needsScheduling(t)) continue;
	    StormTopology topology = t.getTopology();
	    Map<String, Bolt> bolts = topology.get_bolts();
	    Map<String, SpoutSpec> spouts = topology.get_spouts();
	    try{
		for(String name : bolts.keySet()){
		    Bolt bolt = bolts.get(name);
		    JSONObject conf = (JSONObject)parser.parse(bolt.get_common().get_json_conf());
		    if(conf.get(SITE) != null && supervisors.get(conf.get(SITE)) != null){
			String site = (String)conf.get(SITE);
		    }
                    SupervisorDetails supervisor = supervisors.get(site);
                    List<WorkerSlot> workerSlots = cluster.getAvailableWorkerSlots(supervisor);
                    List<ExecutorDetails> executors = cluster.getNeedsSchedulingComponentToExecutors(t).get(name);
                    if(!workerSlots.isEmpty() && executors != null){
                       cluster.assign(workerSlots.get(0), t.getId(), executors);
                    }
		}
	        for(String name : spouts.keySet()){
		    SpoutSpec spout = spouts.get(name);
		    JSONObject conf = (JSONObject)parser.parse(spout.get_common().get_json_conf());
		    if(conf.get(SITE) != null && supervisors.get(conf.get(SITE)) != null){
			String site = (String)conf.get(SITE);
		    }
                    SupervisorDetails supervisor = supervisors.get(site);
                    List<WorkerSlot> workerSlots = cluster.getAvailableWorkerSlots(supervisor);
                    List<ExecutorDetails> executors = cluster.getNeedsSchedulingComponentToExecutors(t).get(name);
                    if(!workerSlots.isEmpty() && executors != null){
                       cluster.assign(workerSlots.get(0), t.getId(), executors);
                    }
		}
	    }catch(ParseException pe){
		pe.printStackTrace();
	    }
	}
    }
}

Step 4: Install Scheduler

After finishing the scheduler, we need to package it into a jar file for installation. It is recommended to use Maven to do it for you as it automates everything well. You can use the POM below with Maven to compile your scheduler.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>storm.custom</groupId>
  <artifactId>scheduler</artifactId>
  <version>0.0.1</version>
  <name>storm-custom-schedulers</name>
  <description>Storm's custom schedulers</description>
  
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
  
  <dependencies>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>0.9.4</version>
    </dependency>
    <dependency>
        <groupId>com.googlecode.json-simple</groupId>
        <artifactId>json-simple</artifactId>
        <version>1.1.1</version>
    </dependency>
  </dependencies>
  
  <build>
    <plugins>
        <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-compiler-plugin</artifactId>
           <version>3.1</version>
           <configuration>
               <!-- or whatever version you use -->
             <source>1.7</source>
             <target>1.7</target>
           </configuration>
        </plugin>
    </plugins>
  </build>
</project>

Type in the following command to update the jar file every time you make changes on the scheduler.

# mvn clean package -U

The command will create a jar file at $SCHEDULER_HOME/target/scheduler-v0.0.1.jar. Copy it into $STORM_HOME/lib/ and tell Nimbus to use the new scheduler by appending the following lines to the configuration file at $STORM_HOME/conf/storm.yaml.

...
storm.scheduler: "storm.custom.scheduler.SiteAwareScheduler"
...

Restart the Nimbus daemon to reflect the changes on configuration. Again, we simply restart supervisord in this example.

# service supervisord restart

There is working example at /root/scheduler in the VM image.

Note: you should always restart supervisors before Nimbus, so that the Nimbus can get the site metadata from the supervisors. Otherwise, the Nimbus would fail to start due to null pointer because it could not find the entry with a key named “site” in the supervisors’ metadata.

Step 5: Validation

Now it is the time to validate if the scheduler is working correctly. Storm provides a web interface that shows the system’s status. Unfortunately, it does not reflect user metadata, so we need to validate it manually. The access to the web interface is http://<nimbus-host-ip>:8080

First of all, submit a topology with site metadata to the cluster. When the topology starts running, you will see the topology link on the web as shown in Figure 5.

Figure 5: Topology link

Figure 5: Topology link

Then click the topology link and you will get a view of the topology. It will give you all the information about the topology. Spouts and bolts in the topology are listed as shown in Figure 6.

FIgure 6: Spouts/Bolts

FIgure 6: Spouts/Bolts

Because spouts and bolts are not the smallest scheduling unit, it does not provide any information about where are the components running. So we can select a component randomly and get a view of a component. In this view, you will see a list of executors and their information, within which there is a Host column indicating on which supervisor the executor is running, as shown in Figure 7.

Figure 7: Exectors

Figure 7: Exectors

Since you know where the component should be running, you should be able to if they are scheduled correctly. Go through all the components to check the correctness.

Scheduler for Production

The example scheduler is designed only for demonstration of the site-aware scheduling mechanism but far from being production-ready. The example is based on the following assumptions.

  1. All submitted topologies and supervisors have correct site metadata.
  2. There is always at least one worker slot available on every supervisor at any point of time.
  3. Every topology evenly distributed its components to every site involved in the computation.

However, these assumptions may not stand in production and it requires more delicate work.

To address the first problem, we can add a backup scheduler for claiming resources on supervisors without site metadata and scheduling topology components without site information. The Storm’s default scheduler, EvenScheduler, can be a good candidate as it distributes workload evenly among supervisors and avoid “hot spot” effectively.

To solve the second problem, we could either cluster supervisors at a particular site to provide sufficient computing resources, or set priority for tasks to enable task preemption. In order to cluster supervisors, we need to put supervisors identified at a particular site into a list in the example with the code shown below.

...
Map<String, List<SupervisorDetails>> supervisors = new HashMap<String, List<SupervisorDetails>>();
for(SupervisorDetails s : supervisorDetails){
    Map<String, String> meta = (Map<String, String>) s.getSchedulerMeta();
    String site = meta.get(SITE);
    if(meta != null && site != null){
        if(supervisors.get(site) == null){
            supervisors.put(site, new CopyOnWriteArrayList<SupervisorDetails>());
        }
        supervisors.get(site).add(s);
    }
}
...

To preempt tasks, Storm offers a freeSlot(WorkerSlot slot) method. We will not discuss more about design of the priority rules here as it is out of the scope of this post.

Load balance is also important for a production environment, especially that site-aware scheduling may exacerbate overload situation on popular sites. Fortunately, Storm supports scheduling with fine granularity – the minimum scheduling unit is an executor, which is basically a Java thread. As you may notice in the example code, you can get a list of executors from a topology component. You are free to break the list into multiple sub-lists and distribute them over multiple supervisors at a particular site, instead of putting a monolithic component on a single supervisor and leaving others idle. In fact, Storm’s default scheduler schedule a topology in this way. In terms of fault tolerance, it is a good practice to duplicate executors of a topology component over multiple supervisors, so that you still have the topology running if a supervisor node fails.

References

The solution is inspired by this post.

Have something to add?

Loading Facebook Comments ...