Spark Job Optimization
Optimizing a Spark job can be a daunting task. You have a job that you've functionally completed, but you're running into any number of random exceptions. To make matters worse, those exceptions keep changing or don't match your expectations. This is because of many different reasons, including that it is easy to write a Spark job without really understanding how Spark truly works.
To that end, I'm going to be starting an intermittent continuing series on optimizing Spark jobs. This series is going to focus on diving into the inner works of Spark, helping you optimize your jobs by better understanding your jobs.
The first topic we're going to tackle is data skew. Data skew happens when for one reason or another, a small percentage of partitions get most of the data being processed. In normal usage, Spark will generally make sure that the data is evenly split across all tasks, so there isn't a big risk of skew. When you do a join, however, Spark distributes the data by join key, so that data from the two tables being joined will be in the same task. If you have a lot of rows with the same key, then you have some tasks with those keys taking much longer than the others.
Overall, data skew generally will lead to one of two situations.
The first is that the job will just plain fail, due to workers running out of memory. This may show up as an OutOfMemoryException or a similar error. Often, this will lead to developers increasing the executor memory limit until everything passes. While this does get the job running, it uses much more resources than necessary. It also leads to the second situation.
The second situation that can happen is when jobs take much longer than they should. While this isn't as catastrophic as the first situation, and often is overlooked, it is still an issue. This situation can be a difficult one to detect, but looking at the History Server entry will help here.
Before we get to that, we need to discuss a bit about how Spark works under the hood. Data in Spark is split into individual partitions. Any time that a Spark job calls an action, or a function which requires some execution to occur, a stage is created to perform that action. Examples of actions include collect, show, and count. A stage in turn has a child task for each partition, which is then run on exactly one executor (assuming no failed tasks or speculative execution).
In the Spark History Server, look through the task list for stages that are taking the longest. Once you click on a stage, you'll see a page similar to the one below.
It is fairly obvious at first glance that this job is very small, but for a larger job that needs optimization, the same values will be visible. At the top, you can see the minimum, maximum, and various percentile values for different metrics. Note that in the "Show Additional Metrics" section I've selected Peak Execution Memory. Skew will be visible here when duration or peak execution memory is bottom-heavy. That is to say, the difference between the minimum and 75th percentile value is small compared to the difference between the 75th percentile and the maximum value. This means that there were a few tasks that, in turn, took much longer than others for this stage, which is indicative of skew.
For more verification of this, you can look at the individual tasks at the bottom, sorting by duration. This allows you to see the specific tasks that took a long time, and check their logs to ensure there weren't any extenuating circumstances causing the issue, instead of skew.
So how do we solve this issue, now that we've identified it? The easiest way is to tell Spark to broadcast the table that has skew. If the table with skew also happens to be small, we can distribute that data to all of the tasks, so that there is no skew anymore. This is often most useful when you have a slow job that has a few long-running tasks, but memory usage is not a huge issue.
To use a broadcast hint, you can use either Spark SQL or normal code. The SQL code and Scala code look like the following
Broadcasts may be done automatically as well, but only if statistics are available for the data. If they aren't available currently, running a query similar to the following will compute the statistics on the Hive table, allowing broadcasts to be done automatically. For best results, this query needs to be run regularly, preferably immediately after significant changes to the table such as inserts.
One final note: due to SPARK-17556, whenever you do a broadcast, the data is pulled onto the driver before pushing it out to all of the worker nodes. This can be surprising the first time, especially if you are diligent about keeping the driver memory usage low. Because of this, if you see driver side OOM exceptions, then it may be reasonable to move your driver memory to the same as your executor memory so that anything broadcast to your executors can fit on your drivers as well.
More information can be found here. Note that the broadcast hint is just that: a hint. That means that Spark can ignore it, and actually will ignore it if the table is too large.
Similarly, Spark also has a skew hint as well. This allows Spark to do various optimizations based on the assumption that a key is skewed. Note that this is a feature specific to Databricks, so it is not available on HDP or CDH. Because of that, I'm not going to spend much more time on it, but you can check out more information about it here.
Randomizing the Join Key
Now we start getting into some of the less conventional methods for dealing with skew. The first one is randomizing the join key on the skewed table. This is based on an implementation by the Data-R-Us Blog. This method assumes that the skewed table is the larger table of the two, but the other table is too large to broadcast.
The first step is to modify the keys on the skewed table with a random seed. For example, if the key is a string, you might add "-N" at the end of each key, where N is a random number between 0 and 9 (or some other range, based on how big your dataset is, and how many partitions per key you want to create). In the other table, you then duplicate the data N times, each copy with the key modified to have a different "-N" appended. So for example, if you chose N to be between 0 and 2, then you would replicate the smaller table 3 times, one with the keys + "-0", one with the keys + "-1", and one with the keys + "-2". This way, the keys in the skewed table will match up with the smaller table.
You can then join the two tables together as normal. You would then need to modify the key to remove the additions, to get the same data back you had before.
This will cause have smaller partitions, so the skew will not affect the time and memory usage as much. Despite this, it does increase the data size. This is something to consider when choosing a strategy for dealing with skew.
The last approach I'm going to mention here is a crazy yet effective one posted by Sim at Swoop. The approach here is similar to the random approach above, but is more transparent to users, allowing it to be added as a library that is reused across teams.
In this approach, you create a custom partitioner that will purposefully distribute the key that is skewed to multiple hosts. You then distribute the corresponding key on the other table to all of them, to ensure that they can be joined properly.
This is not an approach for the weak, but if you can implement it well, it can work across many teams.
Skew can make a good Spark job work poorly. It takes up resources you don't need to use and makes your jobs run slowly at the same time. A lot of keys are naturally skewed, so this issue comes up pretty often. Hopefully, the solutions above help you reduce the amount of skew in your system, or at least put you on the right path. If you have another approach to reducing skew, feel free to add it to the comments below or send me a message!