avro使用详解

    xiaoxiao2022-07-07  200

    一、avro的介绍 1、概括 avro是一个数据序列化系统,它提供

    丰富的数据结构 快速可压缩的二进制数据形式 存储持久数据的文件容器 远程过程调用RPC 简单的动态语言结合功能 2、类型  

    二、avro在hadoop的使用 1、模式确定 例如:{"namespace": "example.avro",         "type": "record",         "name": "User",         "fields": [         {"name": "name", "type": "string"},         {"name": "favorite_number",  "type": ["int", "null"]},         {"name": "favorite_color", "type": ["string", "null"]}         ]     } 其中namespace是包名,name是类名

    2、text数据作为输入 2.1 无插件的序列化 //创建数据记录 Schema schema = new Schema.Parser().parse(new File("user.avsc")); GenericRecord user1 = new GenericData.Record(schema); user1.put("name", "Alyssa"); user1.put("favorite_number", 256); // Leave favorite color null

    GenericRecord user2 = new GenericData.Record(schema); user2.put("name", "Ben"); user2.put("favorite_number", 7); user2.put("favorite_color", "red");

    //序列化 // Serialize user1, user2 and user3 to disk DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class); DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter); dataFileWriter.create(user1.getSchema(), new File("users.avro")); dataFileWriter.append(user1); dataFileWriter.append(user2); dataFileWriter.append(user3); dataFileWriter.close();

    //反序列化 // Deserialize Users from disk DatumReader<User> userDatumReader = new SpecificDatumReader<User>(User.class); DataFileReader<User> dataFileReader = new DataFileReader<User>(file, userDatumReader); User user = null; while (dataFileReader.hasNext()) {     // Reuse user object by passing it to next(). This  saves us from     // allocating and garbage collecting many objects for   files with     // many items.     user = dataFileReader.next(user);     System.out.println(user); } 2.2有插件的序列化 2.2.1 插件导入 <plugin>   <groupId>org.apache.avro</groupId>   <artifactId>avro-maven-plugin</artifactId>   <version>1.8.2</version>   <executions>     <execution>       <phase>generate-sources</phase>       <goals>         <goal>schema</goal>       </goals>       <configuration>         <sourceDirectory>${project.basedir}/../</sourceDirectory>         <outputDirectory>${project.basedir}/target/generated-sources/</outputDirectory>       </configuration>     </execution>   </executions> </plugin> 2.2.2 编译schema文件 注意schema文件放在指定的文件中    在idea中编译此文件,使之在目录中生成class文件

    2.2.3 常规使用 DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class); DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter); dataFileWriter.create(user1.getSchema(), new File("users.avro")); dataFileWriter.append(user1); dataFileWriter.append(user2); dataFileWriter.append(user3); dataFileWriter.close();

    //序列化 // Deserialize Users from disk DatumReader<User> userDatumReader = new SpecificDatumReader<User>(User.class); DataFileReader<User> dataFileReader = new DataFileReader<User>(file, userDatumReader); User user = null; while (dataFileReader.hasNext()) {     // Reuse user object by passing it to next(). This saves us from     // allocating and garbage collecting many objects for files with     // many items.     user = dataFileReader.next(user);     System.out.println(user); } 3、例子(使用的是有插件的方式) MapReduceColorCount:

    package example;

    import java.io.IOException;

    import org.apache.avro.Schema; import org.apache.avro.mapred.AvroKey; import org.apache.avro.mapred.AvroValue; import org.apache.avro.mapreduce.AvroJob; import org.apache.avro.mapreduce.AvroKeyInputFormat; import org.apache.avro.mapreduce.AvroKeyValueOutputFormat; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;

    import example.avro.User;

    public class MapReduceColorCount extends Configured implements Tool {

      public static class ColorCountMapper extends   Mapper<AvroKey<User>, NullWritable, Text, IntWritable> {

        @Override     public void map(AvroKey<User> key, NullWritable value, Context context)         throws IOException, InterruptedException {

          CharSequence color = key.datum().getFavoriteColor();       if (color == null) {         color = "none";       }       context.write(new Text(color.toString()), new IntWritable(1));       }     }

        public static class ColorCountReducer extends   Reducer<Text, IntWritable, AvroKey<CharSequence>, AvroValue<Integer>> {

        @Override     public void reduce(Text key, Iterable<IntWritable> values,         Context context) throws IOException, InterruptedException {

          int sum = 0;       for (IntWritable value : values) {         sum += value.get();       }       context.write(new AvroKey<CharSequence>(key.toString()), new AvroValue<Integer>(sum));     }     }

      public int run(String[] args) throws Exception {     if (args.length != 2) {       System.err.println("Usage: MapReduceColorCount <input path> <output path>");       return -1;     }

        Job job = new Job(getConf());     job.setJarByClass(MapReduceColorCount.class);     job.setJobName("Color Count");

        FileInputFormat.setInputPaths(job, new Path(args[0]));     FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setInputFormatClass(AvroKeyInputFormat.class);     job.setMapperClass(ColorCountMapper.class);     AvroJob.setInputKeySchema(job, User.getClassSchema());     job.setMapOutputKeyClass(Text.class);     job.setMapOutputValueClass(IntWritable.class);

        job.setOutputFormatClass(AvroKeyValueOutputFormat.class);     job.setReducerClass(ColorCountReducer.class);     AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.STRING));     AvroJob.setOutputValueSchema(job, Schema.create(Schema.Type.INT));

        return (job.waitForCompletion(true) ? 0 : 1);   }

      public static void main(String[] args) throws Exception {     int res = ToolRunner.run(new MapReduceColorCount(), args);     System.exit(res);   } } 注意:当采用不用插件的方式时,map的代码如下  @Override  public void map(AvroKey key, NullWritable value, Context context)throws IOException,InterruptedException {}  由于代码并不知道AvroKey的schema,所以要在main中使用AvroJob.setDataModelClass(job,GenericData.class);指定数据的schema。  

    最新回复(0)