Background
Last week, I introduced a series I am going to be doing focusing on myths I see often dealing with Apache Spark job optimization. My first post focused on the common reaction I see for developers to add memory when they need to speed up their jobs. We discussed why this doesn't always work, and what to do instead. One aspect of that solution was to instead increase the number of executors, which can give you better results in some situations.
This week I'm going to turn that on its head, and look at why increasing the number of executors doesn't always work. Increasing the number of executors certainly works more often than increasing the amount of memory per executor, but there are still very important times when it doesn't work the way you'd want it to.
This article assumes that you have developed a few Spark jobs and that you understand how Spark on YARN works. If you are not confident in that, please check out my article last week, which has a section covering this subject.
As with last week, I'm running tests on a local 3-node HDP 2.6.1 cluster using YARN. I've set YARN to have 6 GB total and 4 cores, but all other configuration settings are default. I haven't enabled security of any kind. I'm using Vagrant to set this up, and have supplied the Vagrant file on Github.
I Thought We Were Supposed to Increase the Number of Executors?
If you read the first post last week, one of my final recommendations was to increase the number of executors, which can often work. You may have tried this on your jobs, and it still didn't improve the performance. So did I give bad advice last week, or is your job just as fast as it can possibly be, or is there something else?
I definitely didn't give bad advice last week, and dozens of jobs I've optimized in that manner speak to that. Your job might be as optimized as it can be, but there are many other things it could be as well. I'm going to go through a few possibilities that may cause this, and how to detect and handle these scenarios. These don't cover every possible case, but they should give you a good idea of the possibilities.
Data Skew
Data skew is common in Spark jobs, and can often happen in situations you wouldn't expect it. Whenever I find that a job isn't running as fast as I'd expect, one of the first things I check is data skew. I've handled this topic in-depth in a previous post, so I'll just mention how to detect skew here. If you find you have skew, that post does a great job of going through your options.
The screenshot above is from the Spark History Server's details for a specific stage. In this screenshot, you can see the skew pretty easily. The bottom 75% of jobs took 20 milliseconds to run, yet the maximum job took 800 milliseconds. If we dive into the actual executor times individually and sort by the runtime, we find that the maximum run time (which here is confusingly labeled as having taken 1 second) is more than three times longer than the next longest task.
This is a textbook example of skew. While all skewed jobs may not be as obvious, the same techniques should show them to you. Thankfully this is made pretty easy by the Spark History Server.
Long-Running Driver Code
Another common scenario I have come across is when developers misunderstand the use of Spark and end up writing most of their code to run on the driver. This issue is closely related to the over-use of collect since oftentimes this is caused by collecting data locally in order to process, before sending back up to the cluster. It goes without saying that if you are doing this, either you can hold everything in memory, in which case you should consider whether Spark is the right move for you, or you can't, and you're going to fail.
In these situations, there are many ways to detect how long the driver is running. The first and most obvious is looking at your logging infrastructure for the job. If you see a lot of custom logs in the driver code, then you're likely using the driver more than you should. You can even use the timestamps to get a feel for exactly how long the driver is taking.
If you need more exact than that, the Spark History Server can help. On the main page for the application, it gives a length of all of the jobs, and also a length for the overall application. Comparing these two gives you an idea of how much time was spent outside of actually doing work. There is always going to be some overhead on jobs (on my small cluster for simple applications, I saw around 50 - 60 seconds of overhead on average), but if it gets to be too much, that's a warning flag. See the example below, that has an 84-second difference between application runtime and job runtime. This is much more than the typical 50 - 60 seconds I'm seeing for most jobs on this particular cluster, so it warrants further investigation.
How we fix these issues depends heavily on your driver. Most often this indicates that your architecture is off, and you're using the driver more than you should. Sometimes this is expected and you just need to live with it or do some optimization on the driver code itself.
Not Enough Tasks
This is probably the closest you'll get to a job that is unable to be optimized further without actually reaching that point. In this scenario, your number of tasks is less than the number of executors you have set up. This can be the number of tasks for a specific stage you are trying to optimize, or it can be the maximum number of tasks used in all stages in the job.
This is easily detected in Spark History Server as well, by comparing the number of tasks for a given stage to the number of executors you've requested. The easiest way to see how many tasks per stage is in the job details page, where it shows the progress bar of tasks completed, as seen below.
You can see the first stage for this job only uses 3 tasks. That means that there is no way that increasing the number of executors larger than 3 will ever improve the performance of this stage. The second stage, however, does use 200 tasks, so we could increase the number of tasks up to 200 and improve the overall runtime.
Based on the fact that the stage we can optimize is already much faster than the other stage, however, increasing the number of tasks further likely won't yield enough of a benefit to be worth it. Remember to always keep the balance in mind between using more resources and the benefit you get from it, to avoid wasting resources. If we were just barely under SLA currently, adding more executors could get us under with some room to spare. But it is physically impossible to get more than 13 seconds out of that, based on the above run, and likely that number is closer to 5 seconds.
Conclusion
As I stated last week, increasing the number of executors often times will improve your performance. You shouldn't just blindly do this, however, and hopefully, the scenarios above give you some pause before you do that next time. These scenarios are not the only ones possible, but hopefully, they demonstrate the variety of ways it can happen and give you ideas for what to look for as you work on optimizing your jobs.
Comments