Zookeeper distributed coordination service

Invincible Dragon Warrior 2022-05-22 12:18:12 阅读数:694

zookeeperdistributedcoordinationservice

Why zookeeper

Zookeeper It is widely used in distributed environment , It has many excellent functions , For example, global naming service in distributed environment , Service registry , Global distributed locks and so on .
Before we released a rpc On request ,rpcChannel Yes, directly rpc Request on that server . But it's not really , Services are deployed separately ,rpc All nodes are equal , The status is the same . Want to use a rpc The service , Don't know rpc On which server to run . So you need a Service configuration center , All available rpc All service nodes need to register a service with the configuration center ,IP/PORT.

Zookeeper What is offered

  • file system
    Zookeeper Provides a multi-level node namespace ( Nodes are called znode). Unlike file systems , These nodes can set the associated data , In the file system, only file nodes can store data, but directory nodes can't .Zookeeper To ensure high throughput and low latency , The tree structure is maintained in memory , This characteristic makes Zookeeper It can't be used to store large amounts of data , The upper limit of data storage for each node is 1M.
  • A notification mechanism
    client End to end znode Build a watcher event , When it's time to znode When something changes , these client Will receive zk The notice of , then client Sure S according to znode Change to make business changes, etc .
     Insert picture description here

zookeeper Client common commands

ls、get、create、set、delete

The problem with distributed systems

Tight coupling between client and service provider

Dynamic registration and discovery of services , To support high concurrency ,OrderService Deployed 4 Share , Each client keeps a list of service providers , But this list is static ( Write in the configuration file ), If the service provider changes , For example, some machines down 了 , Or add another OrderService Example , The client doesn't know , Want to get the latest service provider URL list , The configuration file must be updated manually , Very inconvenient .

solve

Remove the coupling , Add a middle layer – Registry, which holds the name of the service that can be provided , as well as URL. First, these services will be registered in the registry , When the client comes to query , Just give the name , The registry will give you a URL. All clients before accessing the service , You need to ask this registry , To get the latest address .
 Insert picture description here

heartbeat (session conversation )

The principle of heartbeat mechanism is very simple : Every N Send a heartbeat message to the server in seconds , After the server receives the heartbeat message , Reply the same heartbeat message to the client . If the server or client is M second (M>N) I haven't received any messages, including heartbeat messages , Heartbeat timeout , We think the goal is TCP The connection has been broken .

Permanent node

RPC Node timed out and did not send heartbeat ,zk This node will not be deleted

Temporary nodes ( What we need )

RPC Node timed out and did not send heartbeat message ,zk The temporary node will be deleted automatically

watcher Mechanism ( Observer mode )

zookeeper Of watch Monitoring mechanism , Used to monitor node data changes . When watch When the monitored data changes , Notify the watch Of client, namely watcher.

Well, since there is watch Listen for data change events , There are corresponding event types and state types

  • The client calls a RPC In service , Can take RPC The name of the service and the name of the method form a node Node path , Go to the service configuration center to find out whether the node exists . If it exists, find ip+port.
  • watch Mechanism : Monitor for changes in nodes , One was maintained map surface , The key is the node name , The value is the content of the node . If the node changes ,zk Will actively tell the client , This is a callback mechanism .

Native ZkClient API The problem is

Zookeeper Native provides C and Java Client programming interface , But it's relatively complicated to use , Several weaknesses :
1. No heartbeat message will be sent automatically <==== error , At the source code meeting 1/3 Of Timeout Time to send ping Heartbeat message
2. Set listening watcher It can only be one-time , Repeat the setting after each trigger
3.znode Nodes store only simple byte Byte array , If the storage object , You need to convert the object to generate a byte array

zookeeper Use

// Packaged zk Client class
class ZkClient
{
public:
ZkClient();
~ZkClient();
// zkclient Start the connection zkserver
void Start();
// stay zkserver According to the specified path establish znode node
void Create(const char *path, const char *data, int datalen, int state=0);
// According to the parameters znode Node path , perhaps znode The value of the node
std::string GetData(const char *path);
private:
// zk The client handle of the
zhandle_t *m_zhandle;
};
#include "zookeeperutil.h"
#include "mprpcapplication.h"
#include <semaphore.h>
#include <iostream>
// Overall watcher viewer zkserver to zkclient The notice of
void global_watcher(zhandle_t *zh, int type,
int state, const char *path, void *watcherCtx)
{
if (type == ZOO_SESSION_EVENT) // The message type of the callback is the message type related to the session
{
if (state == ZOO_CONNECTED_STATE) // zkclient and zkserver Successful connection
{
sem_t *sem = (sem_t*)zoo_get_context(zh);
sem_post(sem);
}
}
}
ZkClient::ZkClient() : m_zhandle(nullptr)
{
}
ZkClient::~ZkClient()
{
if (m_zhandle != nullptr)
{
zookeeper_close(m_zhandle); // Closing handle , Release resources MySQL_Conn
}
}
// Connect zkserver
void ZkClient::Start()
{
std::string host = MprpcApplication::GetInstance().GetConfig().Load("zookeeperip");
std::string port = MprpcApplication::GetInstance().GetConfig().Load("zookeeperport");
std::string connstr = host + ":" + port;
/*
zookeeper_mt: Multithreaded version
zookeeper Of API The client program provides three threads
API Calling thread
The Internet I/O Threads pthread_create poll
watcher Callback thread pthread_create
*/
m_zhandle = zookeeper_init(connstr.c_str(), global_watcher, 30000, nullptr, nullptr, 0);
if (nullptr == m_zhandle)
{
std::cout << "zookeeper_init error!" << std::endl;
exit(EXIT_FAILURE);
}
sem_t sem;
sem_init(&sem, 0, 0);
zoo_set_context(m_zhandle, &sem);
sem_wait(&sem);
std::cout << "zookeeper_init success!" << std::endl;
}
void ZkClient::Create(const char *path, const char *data, int datalen, int state)
{
char path_buffer[128];
int bufferlen = sizeof(path_buffer);
int flag;
// First judge path It means znode Whether the node exists , If there is , No more duplicate creation
flag = zoo_exists(m_zhandle, path, 0, nullptr);
if (ZNONODE == flag) // Express path Of znode Node does not exist
{
// Create assignment path Of znode The node
flag = zoo_create(m_zhandle, path, data, datalen,
&ZOO_OPEN_ACL_UNSAFE, state, path_buffer, bufferlen);
if (flag == ZOK)
{
std::cout << "znode create success... path:" << path << std::endl;
}
else
{
std::cout << "flag:" << flag << std::endl;
std::cout << "znode create error... path:" << path << std::endl;
exit(EXIT_FAILURE);
}
}
}
// According to the designation path, obtain znode The value of the node
std::string ZkClient::GetData(const char *path)
{
char buffer[64];
int bufferlen = sizeof(buffer);
int flag = zoo_get(m_zhandle, path, 0, buffer, &bufferlen, nullptr);
if (flag != ZOK)
{
std::cout << "get znode error... path:" << path << std::endl;
return "";
}
else
{
return buffer;
}
}
copyright:author[Invincible Dragon Warrior],Please bring the original link to reprint, thank you. https://en.javamana.com/2022/142/202205211822452560.html