HDFS 上传和下载文件

    xiaoxiao2023-10-08  169

    import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import java.io.*; import java.net.URI; import java.util.ArrayList; import java.util.List; /** * 操作HDFS分布式文件系统工具类 * 进行目录创建、删除,文件上传、删除、下载等操作 * * */ public class HDFSUtil { static public final String ROOTPATH = "hdfs://IP:port/"; /** * @description: 判断HDFS上是否存在目录或者文件 */ public static boolean isExists(String hdfsFilePath) { if (!hdfsFilePath.startsWith("hdfs://")) { hdfsFilePath = ROOTPATH + hdfsFilePath; } Configuration conf = new Configuration(); Path path = new Path(hdfsFilePath); FileSystem fs; try { fs = path.getFileSystem(conf); return fs.exists(path); } catch (IOException e) { e.printStackTrace(); } return false; } /** * 创建目录 * @param path 目录路径 * @return * @throws IOException */ public static boolean mkdir(String path) throws IOException { if (!path.startsWith("hdfs://")) { path = ROOTPATH + path; } Configuration conf = new Configuration(); Path srcPath = new Path(path); FileSystem fs = srcPath.getFileSystem(conf); return fs.mkdirs(srcPath); } /** * 获取指定路径目录下所有文件名(包括目录) * @param path 相对路径 * @return */ public static List<String> getFileList(String path) { if (!path.startsWith("hdfs://")) { path = ROOTPATH + path; } List<String> files = new ArrayList<String>(); Configuration conf = new Configuration(); Path srcPath = new Path(path); try { FileSystem fs = srcPath.getFileSystem(conf); if (fs.exists(srcPath) && fs.isDirectory(srcPath)) { for (FileStatus statu : fs.listStatus(srcPath)) { files.add(statu.getPath().toString()); } } } catch(Exception e) { e.printStackTrace(); files = null; } return files; } /** * 删除指定路径文件或目录 * @param path 路径 * @return * @throws IOException */ public static boolean deleteFile(String path) throws IOException { if (!path.startsWith("hdfs://")) { path = ROOTPATH + path; } Configuration conf = new Configuration(); Path pathObj = new Path(path); FileSystem fs = pathObj.getFileSystem(conf); return fs.deleteOnExit(pathObj); } /** * 下载HDFS文件或目录到本地 * @param hdfsFilePath hdfs文件或目录路径 * @param localDirPath 本地路径 * @throws IOException */ public static void downloadFile(String hdfsFilePath, String localDirPath) throws IOException { if (!hdfsFilePath.startsWith("hdfs://")) { hdfsFilePath = ROOTPATH + hdfsFilePath; } File localDir = new File(localDirPath); if (!localDir.exists()) { localDir.mkdirs(); } Configuration conf = new Configuration(); Path path = new Path(hdfsFilePath); FileSystem fs = path.getFileSystem(conf); if (fs.isFile(path)) { FSDataInputStream in = null; FileOutputStream out = null; try { in = fs.open(path); File srcfile = new File(localDir, path.getName()); if (!srcfile.exists()) srcfile.createNewFile(); out = new FileOutputStream(srcfile); IOUtils.copyBytes(in, out, 4096); } finally { if (in != null) in.close(); if (out != null) out.close(); } } else { String[] subPath = hdfsFilePath.split("/"); String newLocalPath = localDirPath + subPath[subPath.length - 1] + File.separator; if (fs.exists(path) && fs.isDirectory(path)) { FileStatus[] status = fs.listStatus(path); for (FileStatus statu : status) { downloadFile(statu.getPath().toString(), newLocalPath); } } } } /** * 上传文件或目录到HDFS指定路径下 * @param localPath 本地文件或目录路径 * @param dstDir hdfs指定路径 * @throws IOException */ public static void uploadLocalFile(String localPath, String dstDir) throws IOException { Configuration conf = new Configuration(); File localFile = new File(localPath); if (localFile.isDirectory()) { copyDir(localPath, dstDir, conf); } else { copyFile(localPath, dstDir, conf); } } /** * 复制目录 * @param localPath 本地目录路径 * @param dstDir hdfs路径 * @param conf hdfs配置 * @throws IOException */ private static void copyDir(String localPath, String dstDir, Configuration conf) throws IOException { if (!dstDir.startsWith("hdfs://")) { dstDir = ROOTPATH + dstDir; } Path path = new Path(dstDir); FileSystem fs = path.getFileSystem(conf); if (!fs.exists(path)) { fs.mkdirs(path); } File[] localFiles = (new File(localPath)).listFiles(); for (File localFile : localFiles) { if (localFile.isDirectory()) { String fileName = localFile.getName(); if (dstDir.endsWith("/")) { copyDir(localFile.getPath(), dstDir + fileName +"/", conf); } else { copyDir(localFile.getPath(), dstDir + "/" + fileName +"/", conf); } } else { copyFile(localFile.getPath(), dstDir, conf); } } } /** * 复制文件至HDFS * @param localPath 本地文件路径 * @param dstDir hdfs指定路径 * @param conf hdfs配置 * @throws IOException * @throws FileNotFoundException */ private static void copyFile(String localPath, String dstDir, Configuration conf) throws IOException, FileNotFoundException { File localFile = new File(localPath); String fileName = localFile.getName(); String dstPath = dstDir + fileName; if (!dstPath.startsWith("hdfs://")) { dstPath = ROOTPATH + dstPath; } Path path = new Path(dstPath); FileSystem fs = path.getFileSystem(conf); fs.exists(path); InputStream is = new BufferedInputStream(new FileInputStream(localFile)); OutputStream os = fs.create(path); IOUtils.copyBytes(is, os, 4096); is.close(); } }

     

    最新回复(0)