【java】java处理linux上的日志解决方案(一)

    xiaoxiao2022-07-13  159

    嘎嘎嘎,我又肥来了,还是菜鸡一只,最近产品上线,要做写监控啊,日志解析啊,所以来记录下所做的事情~

    对于日志的解析!

     

    1、业务场景:

    每天会有一个日志数据自动推到我们服务器上的某个目录,然后需要解析日志,将解析好的数据插到数据库中,供其他人使用和查询

    2、思考:

    1、首先,日志推过来是一个压缩文件(.tar.gz),并且是文件名是uuid,像这样(log_134356536522412341.tar.gz,反正就是一段乱七八糟的数字),因为我需要解压,并且读取解压后的数据

    2、数据不能重复插入,也就是说一个日志处理完之后,下次不能再次解析插入,但是也不能直接在程序中删除日志,万一解析失败,结果还把日志删了,这样数据丢失很麻烦,一般我们都不会写自动的东西来删数据,还是手动做这种操作比较安全

    3、因为需要插入数据库,所以需要用开发语言来操作(我对java比较熟,所以用java)

     

    3、方案:

    1、所以我第一步是通过java执行linux的解压命令,来获得解压出来的文件名称,就知道现有哪些日志!

    cd 到日志存放的目录;for tar in `ls 日志存放的目录 | grep 'log_'`; do tar -zxvf $tar; done;

    2、在数据库中维护一张表B,存放已经处理的日志名称

    3、select * from 表B,和我解压出来的文件做差集(得到哪些日志文件未处理)

    4、处理差集的日志文件,存数据到表A,并且把当前这个文件的文件名存入表B,下次就算解压出这个文件,也会因为处理过而不重复处理

     

    4、需要注意的操作:

    1、java执行linux命令(tar -zxvf)获得回显信息!!(这个很重要,否则你不知道你解压了什么),我会单独开一篇文章来说明这个东西

    2、如何获得两个集合的差集?

     

    5、因此代码如下:

    1、先是main方法!

    public static void main(String[] args) throws Exception { //如果从外部传入文件名,就单独处理这个文件 if(args.length!=0){ String fileName=args[0]; //加载传入的文件,获得数据list ArrayList<HashMap<String,String>> dataList = loadData2Map(fileName); //插入结果 insertData(dataList,fileName); }else{ //否则解压获得spark日志 List<String> sparkLogList = tarAndGetLogList(); //查询数据库中已经处理的日志 List<String> dataBaseSparLogNameList = getProcessedLogs(); //解压出来的日志与数据库已处理的日志的差值(未处理过的日志的列表) List<String> reduceList = sparkLogList.stream().filter(item -> !dataBaseSparLogNameList.contains(item)).collect(toList()); if(reduceList.isEmpty()){ System.out.println("没有需要插入的日志数据"); return; } //处理日志 for (String file:reduceList) { String completeFile = "/拼接日志的完整路径/"+file; ArrayList<HashMap<String,String>> dataList = loadData2Map(completeFile); insertData(dataList,file); } } }

    2、通过java的IO流读入数据,做成key-value的list

    /** * 传入读取的日志的文件名,将里面的数据加载成list * 需要解析的日志格式 end=key1:value1,key2:value2,key3:value3,... * @param fileName * @return */ public static ArrayList<HashMap<String,String>> loadData2Map(String fileName){ File f = new File(fileName); ArrayList<HashMap<String,String>> dataList = new ArrayList<HashMap<String,String>>(); try { int count=0; FileReader fr = new FileReader(f); BufferedReader br = new BufferedReader(fr); while (true) { String line = br.readLine(); if (null == line){ break; } if (line.contains("end=")) { count++; HashMap<String,String> dataMap = new HashMap<String,String>(); String[] valueArr = line.split("=")[1].split(","); for (String keyAndValue: valueArr){ String[] tempArr = keyAndValue.split(":"); String key = tempArr[0]; String value = tempArr[1]; dataMap.put(key,value); } dataMap.put("log_desc",line); dataList.add(dataMap); } } System.out.println("出现end的行数为:"+count); } catch (Exception e) { e.printStackTrace(); } return dataList; }

    2、插入数据的方法!!!

    /** * 将读取的spark日志获得的list,插入数据库中,并更新irt_unicom_log_file(记录该文件以及处理过) * @param list 该文件的内容 * @param fileName 插入的文件名称 * @throws Exception */ public static void insertData( ArrayList<HashMap<String,String>> list,String fileName) throws Exception { DBUtil dbutil = new DBUtil(); Connection connPost = dbutil.getConnection(); String sql = "INSERT INTO 库.表A (字段1,字段2,...字段7,gmt_modified) values (?,?,?,?,?,?,?,now())"; PreparedStatement psInto = connPost.prepareStatement(sql,ResultSet.TYPE_SCROLL_SENSITIVE,ResultSet.CONCUR_READ_ONLY); connPost.setAutoCommit(false); for (HashMap<String,String> map : list) { //循环遍历map,处理获得各个字段的值 //..... psInto.setString(1, 字段1); psInto.setString(2, 字段2); psInto.setString(3, 字段3); psInto.setString(4, 字段4); psInto.setLong(5, 字段5); psInto.setString(6, 字段6); psInto.setInt(7, 字段7); psInto.addBatch(); } int[] count = psInto.executeBatch(); connPost.commit(); dbutil.closeConnection(null,psInto,connPost); System.out.println("执行成功"); //将处理过的文件名插入表B中,避免重复处理 Connection connPost2 = dbutil.getConnection(); String sql2 = "INSERT INTO 库.表B (log_name,gmt_modified) values (?,now())"; PreparedStatement psInto2 = connPost2.prepareStatement(sql2,ResultSet.TYPE_SCROLL_SENSITIVE,ResultSet.CONCUR_READ_ONLY); psInto2.setString(1,fileName); psInto2.execute(); dbutil.closeConnection(null,psInto2,connPost2); }

     

    还有部分代码没有贴出来,我会在下一篇更新中提供这部分代码的~

    剩下两个方法!

    1、tarAndGetLogList

    2、getProcessedLogs

    剩两个工具类

    3、CommandUtil

    4、DBUtil

     

    好,本人菜鸡一个,有任何问题欢迎留言,我会一一回答,下次再见拜拜~~!

    最新回复(0)