I will be describing the mechanisms to provide fault tolerance in Flume as well as its impact on scalability due to the increasing number of flows and possible bottlenecks.
In this case, reliability is the ability to continue delivering events and logging them in the HDFS in the face of failures, without losing data. This failures can be due to physical hardware fails, scarce bandwidth or memory, software crashes, etc.
In order to eliminate single points of failure, previous versions of flume had a fail over mechanism that would move the flow towards a new agent without user intervention. This was done via a Flume Master node that would have a global state of all the data flows and could dynamically reconfigure nodes and dataflows through fail-over chains.
The problem with this implementation was that Flume Master would become a bottleneck of the system, although it could be replicated through secondary Flume Master Nodes, and it would become complicated to configure its behavior for larger sets of dataflows. This could have a clear impact on the scalability of the system.
The actual version of Flume addresses this problem by multiplexing or replicating flows. This means that an agent can send data through different channels for load balancing or it can replicate the same flow through two different channels. This solution does provide the so wanted reliability but it either duplicates the information in the system or it needs more agents in order to balance this load.
In order to show the impact of failures in a Flume architecture, a scenario in which the sources are updated very often and are volatile was chosen. This specific case is relevant for the failure of the source gathering agents, since some events get lost. The case in which the Collectors fail can be tolerated in the newer version through the use of persistent channels.
Multiple experiments were performed that test the usage of memory channels versus persistent JDBC channels. Also a new mechanism for tolerating failures is proposed and tested against the already existent architectures.
Lets start by describing the architecture of the Flume-based System represented in figure 1. As one can observe, there are 5 distinct agents, 2 of them acting as collectors. The sources Source1, Source2 and Source3 consisted of three different C applications that would generate sequences of numbers and outputting them to a file. This file would only contain a sequence number at any given moment in order to achieve the volatility property. All the agents were in different machines.
Figure 1. Implemented Flume-based architecture.
Due to the mentioned probable reasons why the Master node was deprecated in the new Flume NG, this report will be evaluating a possible architecture for achieving reliability in Flume.
In order to make it less centralized, the idea was to form smaller clusters in which its nodes are responsible of keeping track of each others. And in case one of them fails, another will take its responsibilities, either by gathering information from a source or aggregating data from other Agents.
Figure 2. Representation of the clustering of Agents.
Red arrows represent ping messages, black dotted arrows represent possible reconfigurations.
Following the architecture depicted in figure 2, a cluster could consist of Agent11, Agent12 and Agent21 and another cluster could consist of C1 and C2. For example, in case Agent11 fails, Agent12 or Agent21 will take its place gathering information from Source1, until it finally restarts. In case Collector2 fails, Collector1 will be aggregating the information provenient from Agent21.
For this purpose any agent belonging to a cluster has to have knowledge about the other agents in the same cluster. This knowledge includes its sources and sinks. In this experiment the agents would ping each other in order to keep track of alive agents.
This notion of multiple small clusters as shown in figure 2, makes the system less dependent on a centralized entity and also eliminates the need of having extra nodes. It might not keep some consistency properties that could be achieved through the use of the embedded Zookeeper though.
Experiments and Results
All the experiments were conducted with a lifespan of a few hours, generating up to 10 thousand events. The objective was to make this periods as big as possible taking into account the costs and other schedules.
Normal execution of flume
The first experiment consisted of the normal execution of flume, without any failures and using memory channels.
As expected during the normal execution of flume all the events generated by the sources were logged into the HDFS. This means that the rate at which the data was being generated was well supported by the capacity of the memory channels.
Collector fails while using memory channels
Second experiment consisted on disconnecting collector agent 2 during the execution of flume and register the lost events.
Although initially I thought that the failure of a collector would imply that the data that was read from the source would be lost by the agent due to the limitations of channel capacity, in mosts tests, Flume was able to store this data and resend it once the Collector was restarted.
Flume uses a transactional approach to guarantee the reliable delivery of the events. The events are only removed from a channel when they are successfully stored in the next channel.
This should still be dependent on the capacity of the channel but for the used implementation with a channel of 10 thousand events of capacity, the Collector2 could be down for more than one hour without any channel overflow.
It was decided to drop this capacity to up to 100 events and double the rate of data generation. After this change I disconnected the Collector2 until more than 100 events were read by Agent21. Once Collector2 came back online it received the 100 events that were stored in the channel of Agent21 but failed to deliver the subsequent events. In fact, it stopped working from that point, requiring a restart.
Collector fails while using persistent channels
Third experiment consisted on disconnecting collector agent 2 during the execution of flume using JDBC channels and register the lost events.
As expected since Flume stores the events in a relational database instead of in memory, when a collector dies the events are stored and delivered whenever it becomes available again. It is indeed the way Flume achieves recoverability. Although it works well it seems that while the memory channel groups events before sending, JDBC channel works more like a pipeline, sending the events one by one. This and the implicit problem of writing into persistent storage might have significant impact on the performance for large scale systems.
Also, and probably due to this fact as well, there seemed to be an imbalance between the rate at which the sources data flows were reaching the Collector. In around every 10 events that reached the Collector, only 2 were from Source1 and 8 were from Source2, although both produced data at the same rate. I wonder if it runs for long periods what will happen, in my case after around an hour the difference already goes in 400 hundred events. There seemed to be no options to group events.
Agent fails while using persistent channels
In this experiment it is the Agent21 that is disconnected using JDBC channels.
Although I expected that when an Agent is stopped it would lose some events, it seems that it still logs them somehow and after restarted resends all the events since the beginning, repeating up to hundreds of events in the process. The experiments seemed to indicate that the JDBC logs events even in case the agent crashes. Other channels such as the recoverableMemoryChannel provide parameters to set how often a background worker checks on old logs but there are no mentions to it on JDBC. Overall, although it repeated hundreds if not thousands of events, it didn’t lose a single event. This wouldn’t be the case though if the whole machine would crash, but i couldn’t test it since my source was in the same node as the agent itself. Further testing would need a new architecture.
In this last experiment, both Collector2 and Agent11 are disconnected. It uses memory channels.
In the way it was implemented, every time a new configuration is generated and the agent restarted, the events in memory are lost. This happens when the Collector2 is disconnected and the Agent21 has to change its dataflow to Collector1. Overall some events were lost while migrating between configurations but it achieved better results than the normal memory reliability scheme of Flume. Although for all-round purpose the JDBC channels probably achieves better reliability results, clustering has less delays in retrieving data. It might also not flow the data in the most efficient way if due to failures it makes all flows go through the same nodes.
As long as the data generation rate doesn’t overflow the available capacity in the channel memory, Flume works well for most cases, with or without failures. There is one failure that probably can’t be handled without the creation of replicated flows, that is the “unsubscribing” from a source due to a failure of a machine (not only the agent) where the source has volatile information. Further experiments should be conducted in order to evaluate the performance of creating clusters versus the actual replication of data flows and subsequent processing in order to store them in a HDFS. Scalability wise it seems that using well implemented clusters would mean having less nodes and less flow of information since the pings/heartbeat rate in a real system is much lower than a data flow. Still, the way Flume has implemented its reliability is good for its simplicity.
The proposed architecture was implemented through multiple scripts instead of actually changing the source code of Flume. This means that there were some workarounds, such as restarting/reloading configurations that might introduce errors in the experimentation. Also this scripts would never provide easy to use access to this routing mechanism. This said, more experiments would be needed to make this report more significative. Even so, it shows an interesting overview of the mechanisms to achieve reliability in Flume while describing their limitations.
Implementing dynamic routing in the actual architecture of flume can also be achieved by using an Avro client to set different headers depending on the pretended route. This could possibly be a solution for implementing the proposed architecture.