Camus源码分析

协议:

输出文件压缩:Camus默认只支持两种压缩格式(snappy和deflate),默认是defalte,使用 StringRecordWriterProvider写入文本格式文档时,还可以指定gzip的压缩格式,扩展其它压缩格式很容易,只需要添加 两行代码就可以,建议增加lzo和lzop的压缩格式,以和我们Hive保持一致。

输出格文件类型:建议文本格式的文件

文件目录规则:(配置的目录)+ topic名 + daily|hour + (年/月/日)|(年/月/日/小时) + 数据文件,例如:/rocketmq/data/vip_ods_heartbeat/daily/2015/06/10 /vip_ods_heartbeat.broker-a.0.999.48388735.1433865600000.deflate

文件名规则:topic名+ (RocketBrokerId)|(kafka的对应分区的learder的BrokerId)+ (RocketQueueId)|(kafka分区号)+ 写入消息行数 + 最后一条消息的Offset + 编码的分区(时间 + 压缩格式后缀),例如:vip_ods_heartbeat.broker- a.0.999.48388735.1433865600000.deflate

Topic的命名规则:业务标识+数据库名+数据库表名(分表只需要BaseName就可以),例 如:vip_ods_heartbeat

消息格式:操作类型\t表名(分表的话是分表名)\t数据库名\t主键名\t唯一索引\tBinlog日志时间 \tCheckPoint字段\tDataBefore\tDataAfter,库名表名都是RockMQ中的原始数据,在生成列数据时,列中数 据如果有\t等特殊字符需要替换,例如:
insert\theartbeat\tvip_ods\tid\tname,pid\t1232132131\t120@21\t{“字段名”,”字段 值”,…}\t{“字段名”,”字段值”,…}

确定数据导入是否完成:Camus中会在History的目录中存放历次消费的状态,包括开始执行的分区和它们的Offset、 执行结束位置的分区和它们的Offset,这两个文件以SequenceFile的形式存放在HDFS文件中,另外Camus在执行结束后可以把 执行信息汇总发送到Kafka的Topic中,Topic的名字为:TrackingMonitoringEvent,如果监控程序监控这个 Topic,是可以得到当前执行的情况的信息的。请评估一下这两种方式,看是否够用,有没有必要自行实现Checker机制。

Leave a Reply

Your email address will not be published. Required fields are marked *