Consistent Core
Maintain a smaller cluster providing stronger consistency to allow large
data cluster to coordinate server activities without implementing quorum based
algorithms.
Problem
Linearizability is
the strongest consistency guarantee where all the clients are guaranteed to see
latest committed updates to data. Providing linearizability along with fault
tolerance needs consensus algorithms
like Raft, Zab or Paxos to
be implemented on the servers.
While consensus algorithm is an essential requirement to implement a
Consistent Core, there are various aspects of client interaction - such as how
a client finds the leader, how duplicate requests are handled, etc - which are
important implementation decisions. There are also some important
implementation considerations regarding safety and liveness. Paxos defines only
the consensus algorithm, but these other implementation aspects are not well
documented in the Paxos literature. Raft very clearly documents various
implementation aspects, along with a reference implementation and
therefore is the most widely used algorithm today.
When a cluster needs to handle a lot of data, it needs more and more
servers. For a cluster of servers, there are some common requirements, such as
selecting a specific server to be the master for a particular task, managing
group membership information, mapping of data partitions to the servers etc.
These requirements need strong consistency guarantee, namely linearizability.
The implementation also needs to be fault tolerant. A common approach is to use
a fault-tolerant consensus algorithms based on Quorum. But in
quorum-based systems throughput degrades with the size of the cluster.
Solution
Implement a smaller, 3 to 5 node cluster which provides linearizability
guarantee as well as fault tolerance. [1] A separate data
cluster can use the small consistent cluster to manage metadata and for taking
cluster wide decisions with primitives like Lease. This
way, the data cluster can grow to a large number of servers, but can still do
certain actions which need strong consistency guarantees using the smaller
metadata cluster.
Figure 1: Consistent Core
A typical interface of
consistent core looks like this:
public interface ConsistentCore {
CompletableFuture put(String key,
String value);
List<String> get(String
keyPrefix);
CompletableFuture
registerLease(String name, long ttl);
void refreshLease(String name);
void watch(String name,
Consumer<WatchEvent> watchCallback);
}
At the minimum,
Consistent Core provides a simple key value storage mechanism. It is used to
store metadata.
Metadata Storage
The storage is implemented using consensus algorithms such as Raft. It is
an example of Replicated Write Ahead Log implementation, where replication is
handled by Leader and Followers and High-Water Mark is
used to track the successful replication using Quorum
Supporting hierarchical storage
Consistent Core is
generally used to store data for things like: group membership or task
distribution across servers. A common usage pattern is to scope the type of
metadata with a prefix. e.g. for group membership, the keys will all be stored
like /servers/1, servers/2 etc. For tasks assigned to servers the keys can be
/tasks/task1, /tasks/task2. This data is generally read with all the keys with
a specific prefix. For example, to get information about all the servers in the
cluster, all the keys with prefix /servers are read.
An example usage is as
following:
The servers can
register themselves with the Consistent Core by creating their own key with
prefix /servers.
client1.setValue("/servers/1", "{address:192.168.199.10,
port:8000}");
client2.setValue("/servers/2", "{address:192.168.199.11,
port:8000}");
client3.setValue("/servers/3", "{address:192.168.199.12,
port:8000}");
The clients can then
get to know about all the servers in the cluster by reading with key prefix
/servers as following:
assertEquals(client1.getValue("/servers"),
Arrays.asList("{address:192.168.199.12, port:8000}",
"{address:192.168.199.11, port:8000}",
"{address:192.168.199.10, port:8000}"));
Because of this hierarchical nature of data storage, products like [zookeeper], [chubby] provide a
file system like interface, where users create directories and files, or nodes,
with the concept of parent and child nodes. [etcd3] has a flat
key space with the ability to get a range of keys.
Handling Client Interactions
One of the key
requirements for Consistent Core functionality is how a client interacts with
the core. The following aspects are critical for the clients to work with the
Consistent Core.
Finding the leader
Serializability and Linearizability
When read requests are handled by follower servers, it is possible that
clients can get stale data, as the latest commits from the leader have not
reached the followers. The order in which the updates are received by the
client is still maintained but the updates might not be most recent. This is
the [serializability] guarantee
as opposed to linearizability.
Linearizability guarantees that every client gets the most recent updates.
Clients can work with serializability guarantee when they just need to read
metadata and can tolerate stale metadata for a while. For operations like Lease,
linearizability is strictly needed.
If the leader is partitioned from the rest of the cluster, clients can get
stale values from the leader, Raft describes a mechanism to provide
linearizable reads. See for example etcd implementation
of readIndex.
A similar situation can happen with followers which are partitioned. The
follower may be partitioned and might not return the latest values to the
client. To make sure that the followers are not partitioned and are up-to-date
with the leader, they need to query the leader to know the latest updates, and
wait till they receive the latest updates before responding to the client, See
the proposed kafka design for
example.
It's important that
all the operations are executed on the leader, so a client library needs to
find the leader server first. There are two approaches possible to fulfil this
requirement.
§
The follower servers in the consistent core know about the current leader,
so if the client connects to a follower, it can return the address of the
leader. The client can then directly connect to the leader identified in the
response. It should be noted that the servers might be in the middle of leader
election when the client tries to connect. In that case, servers cannot return
the leader address and the client needs to wait and try another server.
§
Servers can implement a forwarding mechanism and forward all the client
requests to the leader. This allows clients to connect to any server. Again, if
servers are in the middle of leader election, then clients need to retry until
the leader election is successful and a legitimate leader is established.
Products like
zookeeper and etcd implement this approach because they allow some read-only
requests to be handled by the follower servers; this avoids a bottleneck on the
leader when a large number of clients are read-only. This reduces complexity in
the clients to connect to either leader or follower based on the type of the
request.
A simple mechanism to
find the leader is to try to connect to each server and try to send a request,
the server responds with a redirect response if it's not the leader.
private void establishConnectionToLeader(List<InetAddressAndPort>
servers) {
for (InetAddressAndPort server :
servers) {
try {
SingleSocketChannel
socketChannel = new SingleSocketChannel(server, 10);
logger.info("Trying
to connect to " + server);
RequestOrResponse
response = sendConnectRequest(socketChannel);
if
(isRedirectResponse(response)) {
redirectToLeader(response);
break;
} else if
(isLookingForLeader(response)) {
logger.info("Server is looking for leader. Trying next
server");
continue;
} else { //we know the
leader
logger.info("Found leader. Establishing a new connection.");
newPipelinedConnection(server);
break;
}
} catch (IOException e) {
logger.info("Unable
to connect to " + server);
//try next server
}
}
}
private boolean isLookingForLeader(RequestOrResponse requestOrResponse) {
return
requestOrResponse.getRequestId() == RequestId.LookingForLeader.getId();
}
private void redirectToLeader(RequestOrResponse response) {
RedirectToLeaderResponse
redirectResponse = deserialize(response);
newPipelinedConnection(redirectResponse.leaderAddress);
logger.info("Connected to
the new leader "
+ redirectResponse.leaderServerId
+ " " +
redirectResponse.leaderAddress
+ ". Checking
connection");
}
private boolean isRedirectResponse(RequestOrResponse requestOrResponse) {
return
requestOrResponse.getRequestId() == RequestId.RedirectToLeader.getId();
}
Just establishing TCP
connection is not enough, we need to know if the server can handle our
requests. So clients send a special connection request for the server to
acknowledge if it can serve the requests or else redirect to the leader server.
private RequestOrResponse sendConnectRequest(SingleSocketChannel
socketChannel) throws IOException {
RequestOrResponse request
= new
RequestOrResponse(RequestId.ConnectRequest.getId(), "CONNECT", 0);
try {
return
socketChannel.blockingSend(request);
} catch (IOException e) {
resetConnectionToLeader();
throw e;
}
}
If an existing leader
fails, the same technique is used to identify the newly elected leader from the
cluster.
Once connected, the client maintains a Single Socket Channel to
the leader server
Handling duplicate requests
In cases of failure, clients may try to connect to the new leader,
resending the requests. But if those requests were already handled by the
failed leader prior to failure, it might result in duplicates. Therefore, it's
important to have a mechanism on the servers to ignore duplicate
requests. Idempotent Receiver pattern
is used to implement duplicate detection.
Coordinating tasks across a set of servers can be done by using Lease. The
same can be used to implement group membership and failure detection mechanism.
State Watch is
used to get notifications of changes to the metadata or time bound leases.
Examples
Google is known to use [chubby] lock
service for coordination and metadata management.
[kafka] uses [zookeeper] to
manage metadata and take decisions like leader election for cluster master.
The proposed architecture
change in Kafka will replace zookeeper with its own [raft] based controller cluster.
[bookkeeper] uses
Zookeeper to manage cluster metadata.
[kubernetes] uses [etcd] for
coordination, manage cluster metadata and group membership information.
All the big data storage and processing systems like [hdfs], [spark], [flink] use [zookeeper] for high availability and cluster
coordination.
Yorumlar
Yorum Gönder