Running Spark on Alluxio

This guide describes how to run Apache Spark on Alluxio. HDFS is used as an example of a distributed under storage system. Note that, Alluxio supports many other under storage systems in addition to HDFS and enables frameworks like Spark to read data from or write data to any number of those systems.

Compatibility

Alluxio works together with Spark 1.1 or later out-of-the-box.

Prerequisites

General Setup

  • Alluxio cluster has been set up in accordance to these guides for either Local Mode or Cluster Mode.

  • Alluxio client will need to be compiled with the Spark specific profile. Build the entire project from the top level alluxio directory with the following command:

$ mvn clean package -Pspark -DskipTests
  • Add the following line to spark/conf/spark-defaults.conf.
$ spark.driver.extraClassPath /pathToAlluxio/core/client/target/alluxio-core-client-1.4.0-jar-with-dependencies.jar
$ spark.executor.extraClassPath /pathToAlluxio/core/client/target/alluxio-core-client-1.4.0-jar-with-dependencies.jar

Additional Setup for HDFS

  • If Alluxio is run on top of a Hadoop 1.x cluster, create a new file spark/conf/core-site.xml with the following content:
<configuration>
  <property>
    <name>fs.alluxio.impl</name>
    <value>alluxio.hadoop.FileSystem</value>
  </property>
</configuration>
  • If you are running alluxio in fault tolerant mode with zookeeper and the Hadoop cluster is a 1.x, add the following additionally entry to the previously created spark/conf/core-site.xml:
<property>
  <name>fs.alluxio-ft.impl</name>
  <value>alluxio.hadoop.FaultTolerantFileSystem</value>
</property>

and the following line to spark/conf/spark-defaults.conf:

$ spark.driver.extraJavaOptions -Dalluxio.zookeeper.address=zookeeperHost1:2181,zookeeperHost2:2181 -Dalluxio.zookeeper.enabled=true
$ spark.executor.extraJavaOptions -Dalluxio.zookeeper.address=zookeeperHost1:2181,zookeeperHost2:2181 -Dalluxio.zookeeper.enabled=true

Use Alluxio as Input and Output

This section shows how to use Alluxio as input and output sources for your Spark applications.

Use Data Already in Alluxio

First, we will copy some local data to the Alluxio file system. Put the file LICENSE into Alluxio, assuming you are in the Alluxio project directory:

$ bin/alluxio fs copyFromLocal LICENSE /LICENSE

Run the following commands from spark-shell, assuming Alluxio Master is running on localhost:

> val s = sc.textFile("alluxio://localhost:19998/LICENSE")
> val double = s.map(line => line + line)
> double.saveAsTextFile("alluxio://localhost:19998/LICENSE2")

Open your browser and check http://localhost:19999/browse. There should be an output file LICENSE2 which doubles each line in the file LICENSE.

Use Data from HDFS

Alluxio supports transparently fetching the data from the under storage system, given the exact path. Put a file LICENSE into HDFS under the folder Alluxio is mounted to, by default this is /alluxio, meaning any files in hdfs under this folder will be discoverable by Alluxio. You can modify this setting by changing the ALLUXIO_UNDERFS_ADDRESS property in alluxio-env.sh on the server.

Assuming the namenode is running on localhost and you are using the default mount directory /alluxio:

$ hadoop fs -put -f /alluxio/LICENSE hdfs://localhost:9000/alluxio/LICENSE

Note that Alluxio has no notion of the file. You can verify this by going to the web UI. Run the following commands from spark-shell, assuming Alluxio Master is running on localhost:

> val s = sc.textFile("alluxio://localhost:19998/LICENSE")
> val double = s.map(line => line + line)
> double.saveAsTextFile("alluxio://localhost:19998/LICENSE2")

Open your browser and check http://localhost:19999/browse. There should be an output file LICENSE2 which doubles each line in the file LICENSE. Also, the LICENSE file now appears in the Alluxio file system space.

NOTE: Block caching on partial reads is enabled by default, but if you have turned off the option, it is possible that the LICENSE file is not in Alluxio storage (Not In-Memory). This is because Alluxio only stores fully read blocks, and if the file is too small, the Spark job will have each executor read a partial block. To avoid this behavior, you can specify the partition count in Spark. For this example, we would set it to 1 as there is only 1 block.

> val s = sc.textFile("alluxio://localhost:19998/LICENSE", 1)
> val double = s.map(line => line + line)
> double.saveAsTextFile("alluxio://localhost:19998/LICENSE2")

Using Fault Tolerant Mode

When running Alluxio with fault tolerant mode, you can point to any Alluxio master:

> val s = sc.textFile("alluxio-ft://stanbyHost:19998/LICENSE")
> val double = s.map(line => line + line)
> double.saveAsTextFile("alluxio-ft://activeHost:19998/LICENSE2")

Data Locality

If Spark task locality is ANY while it should be NODE_LOCAL, it is probably because Alluxio and Spark use different network address representations, maybe one of them uses hostname while another uses IP address. Please refer to this jira ticket for more details, where you can find solutions from the Spark community.

Note: Alluxio uses hostname to represent network address except in version 0.7.1 where IP address is used. Spark v1.5.x ships with Alluxio v0.7.1 by default, in this case, by default, Spark and Alluxio both use IP address to represent network address, so data locality should work out of the box. But since release 0.8.0, to be consistent with HDFS, Alluxio represents network address by hostname. There is a workaround when launching Spark to achieve data locality. Users can explicitly specify hostnames by using the following script offered in Spark. Start Spark Worker in each slave node with slave-hostname:

$ $SPARK_HOME/sbin/start-slave.sh -h <slave-hostname> <spark master uri>

For example:

$ $SPARK_HOME/sbin/start-slave.sh -h simple30 spark://simple27:7077

You can also set the SPARK_LOCAL_HOSTNAME in $SPARK_HOME/conf/spark-env.sh to achieve this. For example:

SPARK_LOCAL_HOSTNAME=simple30

In either way, the Spark Worker addresses become hostnames and Locality Level becomes NODE_LOCAL as shown in Spark WebUI below.

hostname

locality

Running Spark on YARN

To maximize the amount of locality your Spark jobs attain, you should use as many executors as possible, hopefully at least one executor per node. As with all methods of Alluxio deployment, there should also be an Alluxio worker on all computation nodes.

When a Spark job is run on YARN, Spark launches its executors without taking data locality into account. Spark will then correctly take data locality into account when deciding how to distribute tasks to its executors. For example, if host1 contains blockA and a job using blockA is launched on the YARN cluster with --num-executors=1, Spark might place the only executor on host2 and have poor locality. However, if --num-executors=2 and executors are started on host1 and host2, Spark will be smart enough to prioritize placing the job on host1.

Need help? Ask a Question