HBase data migration implementation

Actually I'm real 2022-09-23 10:18:00 阅读数:877

hbasedatamigrationimplementation

方式很多,我使用的是Export与Import感觉还挺好用的,There are several scenarios I use,以下会分别说明:

My requirement is to migrate old cluster data to new cluster,但是有两种情况:

1.The two cluster networks can be connected;

2.The two cluster networks cannot be connected.

First, let’s talk about the connection and the problems encountered:

root登陆集群1.

执行以下命令,My premise is that I executehbase shell是可以正常执行的.

hbase org.apache.hadoop.hbase.mapreduce.Export tablename hdfs://新集群IP:8020/test/tablename

I don't have much data in this table,So exporting is fast,中间没有disable表.

Then log in and enter to perform the import operation

hbase org.apache.hadoop.hbase.mapreduce.Import tablename hdfs://新集群IP:8020/lwb/tablename

Here I reported an error

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=root, access=WRITE, inode="/user":hdfs:supergroup:drwxr-xr-x 

这个错误的意思是,root没有hdfs的权限,网上说2个办法1.给root权限,2修改hdfsRestart after parameters.

I'm useless,I just switched to hdfs用户#su hdfs

I don't have a password for this user,After that, execute the import command above directly,Another exception,导入失败

Error: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1619 actions: tablename: 1619 times,

The reason is that I am newhbaseNo table was created in the library,So I created the table name istablename的新表.

Then perform the import,导入成功.


2.The two clusters are completely disconnected

1.Export the table data of the old cluster to the old cluster itselfhdfs里

hbase org.apache.hadoop.hbase.mapreduce.Export tablename hdfs://旧集群IP:8020/test/tablename

2.下载hdfs文件,这里我是使用javafile downloaded by the program,代码如下.

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
public class HDFSTest {
private static String HDFSUri = "hdfs://10.10.1.68:8020";//Pay attention here when downloading and uploading,Modified to the old cluster and the new clusterIP地址
public static void main(String[] args) throws Exception {
String HDFSFile = "/lwb/FASTAChargeMsg";
String localFile = "D:\\FASTAChargeMsg";
// 连接fs
System.setProperty("HADOOP_USER_NAME", "hdfs");// 这句必须加,Whether a permission error will be reported,而hdfs是hadoop的用户名,This account is authorized to operatehdfs的
FileSystem fs = getFileSystem();
System.out.println(fs.getUsed());
// 创建路径
// mkdir("/zhaojy2");
// // 验证是否存在
// System.out.println(existDir("/zhaojy2", false));
// // 上传文件到HDFS
copyFileToHDFS("D:\\part-m-00002", "/lwb/FASTAGPRSMsg/part-m-00002");
// 下载文件到本地
// getFile("/lwb/FASTAGPRSMsg/part-m-00002", "D:\\part-m-00002");
// getFile(HDFSFile,localFile);
// 删除文件
// rmdir("/lwb/FASTAGPRSMsg");
// // 读取文件
// readFile("/zhaojy/HDFSTest.txt");
}
/**
* 获取文件系统
*
* @return FileSystem 文件系统
*/
public static FileSystem getFileSystem() {
// 读取配置文件
Configuration conf = new Configuration();
// 文件系统
FileSystem fs = null;
String hdfsUri = HDFSUri;
if (StringUtils.isBlank(hdfsUri)) {
// 返回默认文件系统 如果在 Hadoop集群下运行,Use this method to get the default file system directly
try {
fs = FileSystem.get(conf);
} catch (IOException e) {
e.printStackTrace();
}
} else {
// 返回指定的文件系统,如果在本地测试,需要使用此种方法获取文件系统
try {
URI uri = new URI(hdfsUri.trim());
fs = FileSystem.get(uri, conf);
} catch (Exception e) {
e.printStackTrace();
}
}
return fs;
}
/**
* 创建文件目录
*
* @param path
* 文件路径
*/
public static void mkdir(String path) {
try {
FileSystem fs = getFileSystem();
System.out.println("FilePath=" + path);
// 创建目录
fs.mkdirs(new Path(path));
// 释放资源
fs.close();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 判断目录是否存在
*
* @param filePath
* 目录路径
* @param create
* Whether to create if it does not exist
*/
public static boolean existDir(String filePath, boolean create) {
boolean flag = false;
if (StringUtils.isEmpty(filePath)) {
return flag;
}
try {
Path path = new Path(filePath);
// FileSystem对象
FileSystem fs = getFileSystem();
if (create) {
if (!fs.exists(path)) {
fs.mkdirs(path);
}
}
if (fs.isDirectory(path)) {
flag = true;
}
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}
/**
* 本地文件上传至 HDFS
*
* @param srcFile
* 源文件 路径
* @param destPath
* hdfs路径
*/
public static void copyFileToHDFS(String srcFile, String destPath) throws Exception {
FileInputStream fis = new FileInputStream(new File(srcFile));// 读取本地文件
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(URI.create(HDFSUri + destPath), config);
Path p = new Path(HDFSUri + destPath);
OutputStream os = fs.create(p);
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
fs.setPermission(p, permission);
// copy
IOUtils.copyBytes(fis, os, 4096, true);
System.out.println("拷贝完成...");
fs.close();
}
/**
* 从 HDFS 下载文件到本地
*
* @param srcFile
* HDFS文件路径
* @param destPath
* 本地路径
*/
public static void getFile(String srcFile, String destPath) throws Exception {
// hdfs文件 地址
String file = HDFSUri + srcFile;
Configuration config = new Configuration();
// 构建FileSystem
FileSystem fs = FileSystem.get(URI.create(file), config);
// 读取文件
InputStream is = fs.open(new Path(file));
IOUtils.copyBytes(is, new FileOutputStream(new File(destPath)), 2048, true);// 保存到本地
// 最后
// 关闭输入输出流
System.out.println("下载完成...");
fs.close();
}
/**
* 删除文件或者文件目录
*
* @param path
*/
public static void rmdir(String path) {
try {
// 返回FileSystem对象
FileSystem fs = getFileSystem();
String hdfsUri = HDFSUri;
if (StringUtils.isNotBlank(hdfsUri)) {
path = hdfsUri + path;
}
System.out.println("path:" + path);
// 删除文件或者文件目录 delete(Path f) 此方法已经弃用
System.out.println(fs.delete(new Path(path), true));
// 释放资源
fs.close();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 读取文件的内容
*
* @param filePath
* @throws IOException
*/
public static void readFile(String filePath) throws IOException {
Configuration config = new Configuration();
String file = HDFSUri + filePath;
FileSystem fs = FileSystem.get(URI.create(file), config);
// 读取文件
InputStream is = fs.open(new Path(file));
// 读取文件
IOUtils.copyBytes(is, System.out, 2048, false); // 复制到标准输出流
fs.close();
}
}

3.Upload the file to the new cluster using the procedure above.

4.Run the import command on the new cluster,It is the same as for the connection method.

hbase org.apache.hadoop.hbase.mapreduce.Import tablename hdfs://新集群IP:8020/lwb/tablename

Here I am testing a table with300多万条数据,,大小在2G左右,Import and export speed is ok.




copyright:author[Actually I'm real],Please bring the original link to reprint, thank you. https://en.javamana.com/2022/266/202209231010447959.html