Background
One of the things I see most often when looking at client's clusters is a poor usage of their resources by their clusters. I see clients that have 100% memory usage but only 50% CPU usage complain about needing more nodes in order to run their jobs. Or there are others who have jobs that sit on the cluster for ages while they compute, taking up resources and slowing up everyone else. These sorts of issues can sometimes be managed through YARN queue management, but this ignores the root causes of the issues.
Additionally, I'm often told that Spark jobs are running slower than Hive jobs on their clusters, so they've never moved to Spark. While Hive jobs can run faster than Spark jobs, in most cases, Spark is the better option and should be comparable.
While these two issues seem to be separate, they often can be traced to the same root cause: a poorly optimized Spark job running on the cluster. For the new year, I'm going to be starting a series on myths that I've seen people believe when it comes to Spark job optimization. Some of these might be obvious to you, and others may come as a surprise, but I hope the information is useful.
For all of these, I'm running tests on a local 3-node HDP 2.6.1 cluster using YARN. I've set YARN to have 6 GB total and 4 cores, but all other configuration settings are default. I haven't enabled security of any kind. I'm using Vagrant to set this up, and have supplied the Vagrant file on Github.
Spark on YARN Details
One of the biggest issues I see is developers who just keep increasing memory whenever they run into issues. Not only is this haphazard and leads to inconsistent results, but it also doesn't actually do what they think it does.
To understand why it's important to go back to the basics. Spark only runs jobs when an action is called, which is an operation that requires instance feedback, such as collect or count. An action is made up of multiple stages, which represent operations leading up to that action from either the source of the data or cached results that were previously calculated. Stages are split based on necessary shuffles or movements of data, so multiple operations could be contained in a single stage. A stage is further split into tasks, each of which handles the operation for a subset of the data. This task is the base unit and the level at which Spark job optimization should start.
The graphic below shows how all of this works in a real-world, albeit simplified example. We have a simple program that is reading in data, grouping it by the tens and ones place, and then getting the count of each value. As you can see, the action here is the last line, which writes out to a CSV file. When that is executed, we first need to read the file and get the remainder, which makes the first stage. The group by requires a shuffle, so that is moved to a second stage, along with the writing of the data into HDFS.
Myth #1: Increasing the Memory Per Executor Always Improves Performance
Getting back to the question at hand, an executor is what we are modifying memory for. A given executor will run one or more tasks at a time. Assuming a single executor core for now for simplicity's sake (more on that in a future post), then the executor memory is given completely to the task. In this instance, that means that increasing the executor memory increases the amount of memory available to the task.
Given this, it's pretty clear that increasing memory is not going to make a task run faster. And because this only impacts a single task (no cross-talk due to multiple executor cores or parallelism or anything like that), optimizing the memory for the task is of paramount importance, and will give us the result we want.
What Then?
So if increasing memory isn't the right answer, what is? Well, there are two things we can improve: we can make more tasks run at the same time, and we can make the individual tasks run faster. Making the individual tasks run faster is not something that is easy to do when using the new Spark data frame API since that optimization is largely done for you automatically. Because of this, we'll look at making more tasks run at the same time.
First step to see if we can make more tasks run at the same time is to see if we are serializing tasks on the executor currently. In Spark History Server, you can see the timeline for a given stage. If it looks something like below, then you have tasks that are waiting for time on the executor. If we increase the number of executors, then we could have more tasks running at once.
Keep in mind that increasing the number of executors will increase the resource usage of your job, but in this case, it will also improve performance, so the trade-off can be worth it.
This obviously isn't the only way to tackle this issue, and we'll discuss other techniques in future posts.
Then How Much Memory Is Enough?
Once we've seen how to improve performance, the next question is often how much executor memory we should give. This is a trickier question, and, as with most things in technology, the general answer is "it depends".
One place you can go to determine this is looking at the data you are processing. If you are processing 2 GB of data (not exactly big data, but it happens), then it's safe to assume 2 GB is a good maximum executor memory size. You may need more, but it should be rare.
Also, keep in mind that as long as you aren't doing joins or group bys, you can safely assume each executor will hold (Total Size of Data) / (Number of Tasks) GB of data. I wouldn't strictly stick to this calculation, however, and encourage up to double that value to give space for spikes in input data size. Joins and group bys introduce the possibility of skew or data from other data sources is included, so more memory will be necessary there too. One final thing to keep in mind here is that the number of tasks is determined in various different ways, depending on the source of the data frame. Additionally, you can use repartition to change the number of tasks, if necessary.
One final way you can determine whether your current memory allocation is too high or not is to look at the Spark History Server again. Looking there in the stage details page, you can find output that looks like the following:
As you can see here, we had two tasks run, each of which processed 15 - 20 MB of data. The input dataset I'm using is 34.2 MB, so this makes sense. Based on this, we can probably set the executor memory to 128 MB and still be an order of magnitude safe from any spikes we might see. Doing this will not make performance worse at all, and in fact, may make the performance slightly better due to garbage collection times and wait times on the YARN queue.
Conclusion
This is obviously a very cursory glance at this issue, which we will be exploring in the weeks to come. These techniques are not the only techniques, nor is this the full story on these techniques. Hopefully, it's given you some insight on how to better optimize your jobs without increasing memory, and give you some tools going forward. Next week, we'll discuss the opposite: why increasing the number of executors won't always improve your performance.
Commentaires