Debugging From The Field: The Case of the Empty Files
This is one of a series of posts revolving around debugging stories from the field. The goal of this series is to help demonstrate how to debug issues in a Hadoop application, as well as show some of the decision-making processes that go into diagnosing issues. I encourage you to look past just the problem and the solution and instead focus on the process. Feel free to post comments on possibilities you think we missed, or questions on why we went a certain direction so that we can all learn from each other! All names are omitted in order to protect the innocent and not-so-innocent.
A team at a client was using Spark to read and write to a Kafka topic. On the surface, this seems like a pretty trivial situation, but somehow they were seeing occasional files that would be written that were completely empty. They were zero bytes, and nothing came out when you cat the file on the terminal.
Given that this meant a loss of data, this is obviously a huge deal that warranted immediate investigation.
Tracing one of the datasets that were lost, they found the data in Kafka correctly. This meant that the issue was with the Spark job which read data from Kafka and wrote it to HDFS.
The team looked at the logs for the runs which generated these empty files, and the only error messages were InterruptedExceptions, similar to the following.
These would appear at a lot of different places but often appeared to be during HDFS operations. This would interestingly not trigger the job to fail, despite all of these exceptions.
The cluster has had a history of NameNode failures, causing a swap between the secondary and primary NameNodes. Because of this, the NameNode was considered the primary suspect. At this point I was brought in, to investigate whether this hypothesis held water.
First Suspect: NameNode
Once being brought up to speed, my first request was to see the timestamps of the last few instances of this issue happening. If this was indeed due to NameNode failover, then you would expect the issue happening exclusively when failover occurred. This was easily found by the team by looking at the timestamps of the empty files and then coordinating them with job instances, to find which jobs caused the issues. The times when these jobs were running were the times during which we would want to see a NamNode failover.
Finding when the NameNode failed over was surprisingly hard, however. Tools like Grafana and SmartSense don't seem to track this out of the box. I didn't want to try to add a dashboard to Grafana to track this, given the amount of effort that would entail.
Instead, I decided I would find a way to grep the logs for a log message indicating a failover and use that to find all of the failovers in the given time periods. First, however, I needed a sample to use. The simplest way to do this was to figure out when the last failover occurred, and look for a good, unique line I could grep for other failovers.
The nature of the failover that we were seeing in the NameNode required the process to be restarted afterward. Because of this, it was pretty easy to take a look at the secondary NameNode's uptime and take a look at the log messages prior to that. Those messages would be right when the failover occurred, which would make finding the log message we needed easily.
As I started this process, however, I realized something quickly: the timestamp of the last failover was prior to the last instance of the empty file being generated. That means that the last empty file couldn't have been generated by the NameNode failing over. Because of this realization, I didn't have to do any more investigation. This issue wasn't caused by the NameNode failure. It also meant we didn't have any more leads.
So Now What?
At this point, we returned to the error message itself. The InterruptedException is thrown when a JVM program has the kill signal is sent to the JVM. This can happen due to a Ctrl-C in a bash interactive terminal, or due to the parent program killing the JVM. Since this was only happening on the executors themselves, it couldn't be a Ctrl-C. The chances of someone logging into these nodes and sending signals manually was also very unlikely, considering it hadn't been seen elsewhere. Additionally, YARN should retry containers that fail like that, and we hadn't seen a retry, it just kept going on.
Given this, I wanted to make sure that there weren't any other exceptions causing the issue. If there were other errors that YARN noticed, its possible YARN could've killed the container, but not retried it for whatever reason, and not failed the overall job. Looking further up in the logs, however, there was no sign of any issues. It just got an interruption and quit.
At this point, the last job that we knew had it was no longer available in Spark History Server, so we couldn't do any digging there. Because of that, there wasn't much more we could do. Because of this, I asked them to take a look at the History Server the next time it happened, and capture some screenshots of anything interesting.
In the meantime, I had them send me the code they were running. Sometimes this can give you a good idea of what could go wrong. In this instance, there was one strange situation in particular. They were using the HDFS library to write files inside of the tasks themselves, instead of writing the DataFrame to HDFS using the traditional writer libraries. I talked to them about this, and they stated they did this so they could write to one file and control the name. Otherwise, it would write to as many files as there were tasks, and the name would be a mangled name. I mentioned to them this was highly sub-optimal and suggested they coalesce the DataFrame to a single task and add a second step which would rename the file to whatever they wanted. This would take some time, however, and there was no indication in the logs that this was causing an issue, so it wasn't investigated further.
The next week we met, and we walked through the screenshots. As they showed me screenshots, we came across this one.
The note at the far right caught our eye and was interesting, but that didn't tell us much other than that YARN was definitely the one killing these containers. We suspected as much, but now we had proof.
The real interesting piece was the left side, where, under the attempt section, in parenthesis, it states speculative.
This refers to Speculative Execution, which is an optimization technique that can be turned on for Spark. In this optimization, certain tasks that are liable to be slow or fail are identified, and then a second version of the same task is run in parallel. Once one of the two tasks succeeds, the other is killed, and we use the results from the successful one. This is great if you have extra executors you can use for longer-running tasks.
Looking at the spark-submit command confirmed that Speculative Execution was enabled for this job.
Based on this combined with the code, the issue was coming into focus. Speculative Execution caused multiple tasks running the code which wrote to HDFS to run in parallel for the same piece of data. These tasks had side-effects (namely the writing to HDFS), so they overwrote each other. In the middle of this, one of the tasks would finish, and the other would be killed before it completed its write to the file. Thus, you ended up with an empty file that had been created, written to, and then created again (truncating the file).
Thankfully, the solution to this issue was simple: turn off speculative execution, and the issue resolved itself. A week later I checked in with the team, and no issues had been seen since.
So Should We Just Not Use Speculative Execution?
The first question that the team asked me after we found this issue was when they should use speculative execution. Well-written Spark code should not have side effects in the tasks themselves. Instead, leave that to libraries you use that interface with outside systems. As long as this is followed, Speculative Execution should not cause problems.
That said, when will it help? It helps when you have a skewed runtime that is not constant across the cluster. If one run may run much faster than another, or if running on some nodes will be much slower than running on other nodes, then Speculative Execution can save you a lot of time. Off the top of my head, there aren't a lot of examples of this that would be considered canonical Spark examples.
The closest to a real example I can come up with is a possibly infinitely running job, such as those found in AI, that you run on a row by row basis. Having multiple tasks run at once allows one of those tasks to hopefully finish and return, and you can cancel the other one. Another example might be working with an external system that is known to have network issues with some of the nodes in the cluster. A well-built cluster wouldn't have this issue, but not all clusters are perfect.
Because of this, I personally have never recommended using this tool. But it is important to understand, in case that need does come up for you.
A few lessons stick out from this experience. The first is when you hit a wall where your suspect doesn't work out, go back to the basics. Take a look at the error message itself and the context in which the error message is being run.
Additionally, issues are oftentimes not in a single dimension but instead caused by a confluence of triggers. In this case, just looking at the code didn't immediately show the issue, and looking at the logs didn't either. Instead, understanding the problem points in the code, and applying that to the logs discovered the problem. If you only look at one part or another, you aren't going to easily find these sorts of issues.
Finally, when being handed off a system, make sure you understand the system. Understand configuration overrides being used, and why they were chosen. If you don't understand something being used, investigate it. While understanding Speculative Execution may not have helped the team solve it themselves, it certainly wouldn't have hurt. Remember, you own the code, so why not understand what you own?