Debugging From The Field: When Parallelization Goes Wrong
This is one of a series of posts revolving around debugging stories from the field. The goal of this series is to help demonstrate how to debug issues in a Hadoop application, as well as show some of the decision-making processes that go into diagnosing issues. I encourage you to look past just the problem and the solution and instead focus on the process. Feel free to post comments on possibilities you think we missed, or questions on why we went a certain direction so that we can all learn from each other! All names are omitted in order to protect the innocent and not-so-innocent.
I was recently included in an email chain about some failures that were occurring for a Spark Streaming job. This job was supposed to take messages from Kafka and send them to a REST API. Despite the simplicity, this system had already had other issues present themselves.
The email list included three exceptions they found in the logs. Disappointingly, all three of them ended up being red herrings, describing failures at too high of a level to be useful or in a different part of the system completely. Luckily, we did have a timestamp to go on, which helped the efforts in finding the right information.
Collecting the Errors
The first step we needed to take was to get a listing of all of the exception call stacks and other worrisome log messages in the executor logs. In this cluster, like many clusters, the YARN Resource Manager UI has issues displaying logs to the user. Even if we had had this ability, based on the fact that this is a Spark Streaming job, the logs would've been too long to properly analyze without downloading first.
Based on this observation, I logged into one of the nodes on the cluster and ran the following command. Most of you will recognize this command downloads the logs for all containers which ran as part of the given YARN job.
Sadly, this failed due to running out of disk space on the node. In the past, we've used the size command to see only a certain amount of logs for each container, but this time we can do one better. Looking at the executors tab in the Spark History Server, we found that only five containers actually failed. Based on this, we just needed to download the logs for those five containers and analyze them. Additionally, since we didn't need to correlate timestamps, we could do one at a time, to avoid running out of hard drive space. Below is an example of what this looks like. Note the list of executors at the bottom, with the column for failed tasks. Any row that does not have a zero value there needs to be investigated.
For each of the hosts which had the failure, we now needed to determine the YARN container ID, so we could get the logs for that container. To do this, we can use the yarn logs command, like the following.
This returns a list of all of the container IDs that ran, along with the node they ran on. The output is even sorted by the hostname! Based on this output, we were able to find a line like the following for each host we wanted to investigate.
We then could take that container ID, and run the following command to get all logs for that container.
Now We're Getting Somewhere!
For each executor's logs, we searched for all logs within the minute that the issue occurred. We used a command similar to the following grep command to do this. Note the use of --before-context and --after-context. This allows us to see log messages that happened during this time period, but don't have the timestamp on that line. This includes things like call stacks for exceptions, which are obviously very important. The values themselves are just estimates, and you can increase or decrease them as necessary.
Going through the executor logs, we found a lot more red herrings, but one call stack stood out to me:
In a Spark job, you shouldn't be handling multiple threads yourself, instead of relying on Spark to do this for you. Based on that assumption, this isn't something you'd expect. I checked the code and verified they were using the standard method for ingesting Kafka data into a Spark Streaming job.
So at this point, we were stuck. There was no obvious use of multiple threads by Spark, yet we were getting errors for using Kafka in a multi-threaded environment. At this point, I decided to just try googling the exact error message as above, and see what we got. Obviously we knew what the error message meant, but we hoped that someone else had had the issue and resolved it.
And luckily, Stack Overflow came to the rescue. We found a post with exactly this issue. It pointed to a Spark issue which had been opened for Spark 2.1 (the version we were running) and was fixed in Spark 2.4. While the issue was explicitly when using the windowing feature, reading through the comments it was easy to see the issue was general, whenever you were using multiple executor cores in a Kafka streaming project.
Checking the spark-submit command for this application, it was set to the following, which does use four cores per executor.
Based on the issue, we needed to reduce the number of cores to 1. We wanted to continue having the same parallelism and not use any more memory, however, so we needed to adjust the other parameters as well. To do this, we multiply the number of executors by the old number of cores. This allows us to do the same number of tasks in parallel. To prevent overuse of resources, we then divided the executor memory by the old number of cores, giving us 1.5 GB per executor. So the final parameters look like the following:
Making this change, we were able to run the job without seeing this failure again. The other failures we had seen shifted a bit and required more investigation (that I might discuss in another post), but we no longer had issues with tasks not being able to read Kafka due to concurrency issues.
This issue was a fairly straightforward issue but ended up being a little known bug in the Spark-Kafka integration. That said, it's important to note weeding through the red herrings and false positives to find the error that was actually the (main) issue. This is a skill that is learned more than taught, but it's always good to consider where a given error is happening, and whether it makes sense it is anything that you could've triggered. Sometimes, if you're unsure, googling the error is the step you need to weed out the fake errors.