Camus实现要点

输出文件压缩:Camus默认只支持两种压缩格式(snappy和deflate),默认是defalte,使用 StringRecordWriterProvider写入文本格式文档时,还可以指定gzip的压缩格式,扩展其它压缩格式很容易。

文件目录规则:(配置的目录)+ topic名 + daily|hour + (年/月/日)|(年/月/日/小时) + 数据文件,例如:/rocketmq/data/vip_ods_heartbeat/daily/2015/06/10 /vip_ods_heartbeat.broker-a.0.999.48388735.1433865600000.deflate

文件名规则:topic名+ kafka的对应分区的learder的BrokerId+ kafka分区号+ 写入消息行数 + 最后一条消息的Offset + 编码的分区(时间 + 压缩格式后缀),例如:vip_ods_heartbeat.broker- a.0.999.48388735.1433865600000.deflate

确定数据导入是否完成:Camus中会在History的目录中存放历次消费的状态,包括开始执行的分区和它们的Offset、 执行结束位置的分区和它们的Offset,这两个文件以SequenceFile的形式存放在HDFS文件中,另外Camus在执行结束后可以把 执行信息汇总发送到Kafka的Topic中,Topic的名字为:TrackingMonitoringEvent,如果监控程序监控这个 Topic,是可以得到当前执行的情况的信息的。

Sqoop使用分析

Sqoop的Mysql数据导出实现分两种,一种是使用JDBC方式从Mysql中获取数据,一种是使用MysqlDump命令从MySql中获取数据,默认是 JDBC方式获取数据,如果要使用dump方式获取数据,需要添加 -direct 参数。

先说第一种:
配置语句时,需要添加 $CONDITIONS 点位符,比如:SELECT id FROM user WHERE $CONDITIONS,Sqoop在内部实现时会把它替换成需要的查询条件。
Sqoop起动后会先查询元数据,它会把 $CONDITIONS 替换为 (1=0) ,然后用得到的SQL语句查询数据库。这块Sqoop的实现不太好,对于导出一个表的情况,它会使用这个SQL查询三次数据库,分别是:获取 colInfo(最终得到columnTypes信息)、查询ColumnNames信息、生成QueryResult类时 generateFields操作获取columnTypeNames时。
Sqoop会对获取的Fields做校验,列不能重复,它还会处理数据库的字段到Java属性名的转换
QueryResult类是通过构建类文件,然后获取JavaCompiler,然后编译加载,为了提高处理性能,这块不是使用反射 实现,这个生成类内部处理mysql到hdfs属性值为空和分隔符的处理。
接着它会进行下面一个Sql查询操作,查询结果集为MIN(split列),MAX(split列),查询条件的处理逻辑为 $CONDITIONS 替换为(1=1),然后组合 (举例: SELECT MIN(id), MAX(id) FROM (SELECT ID,NAME,PASSPORT WHERE (1=1) ) AS t1 ),这样就查询出来此次导出数据最大的split列值和最小的split列值。
对于为整数、布尔值、时间格式、Float等 的分区列,进行split构建比较容易,这里就不多说,对于Text文本的处理方式需要解释一下,其先会对之前获取到的Min和Max的字串寻找它们最大 的相同字串,然后对于后面的字段转化为BigDecimal,结合char占两个字节(65536),进行处理,算法在 TextSplitter类中,比较简单,就是一个进制转换的问题。拆分好后,需要把Split的值再转换为String,然后加上公共 前缀,就构成了查询区间了。
其对数据的获取是在DataDrivenDBRecordReader中,在查询时会把 $CONDITIONS 替换成 split 的范围比如 ( id >= 1) && (id < 10),使用JDBC获取到游标,然 后移动游标处理数据。

第二种方法与第一种方式有下面的差别:
初始化元数据,它是在构建的查询语句后面添加 limit 1 ,比如:SELECT t.* FROM `user` AS t LIMIT 1,因为dump方式查询指定获取列是 t.*,当使用limit 0时,数据库不会给它返回必须的元数据信息。
dump方式在map进行数据的获取,其会构建mysqldump命令,然后使用java去调用,输入输出流和错误流,其实现了 org.apache.sqoop.util.AsyncSink抽象类,用来处理输入输出流和错误流。

优化策略:

Sqoop查询无数据会进行三次相同的Sql查询,可以合并查询,不过由于查询很快,这块不需要修改实现。
分区列选择对于查询元数据和导出的查询都有影响,应该对索引做调优,避免对分区列的排序操作,加快元数据查询速度和导出数据的速度,尽量选择自增加的主键ID做Split列,区分度好并且可以顺序读取数据。
导出操作的查询语句中,$CONDITIONS 会被替换为范围区间,创建索引时,要考虑这个查询的优化。
索引建议,考虑三个规则(使查询数据集较少、减少点的查询、避免排序操作),Sqoop场景下,如果分区列不是主键(自增加)时,把分 区列做为联合索引的第一个字段,其它被选择的查询条件做为索引的其它字段,可优化此查询。
分区列的选择,要避免Split后数据不均衡。
从实现上来看-m参数是可以增加任务的并行度的,但数据库的读线程是一定的,所以-m过大对于数据库会是一个压力,当然可以限制任务的同时最多拥有资源量。在Sqoop的场景下,数据库才是一个影响并发的瓶颈,增加job数意义不大。

下面列出Sqoop目前1.4.6版本存在的两个问题。

查看Sqoop源码,发现其存在两个比较严重的问题。

问题 1、数据分片与Mapper的问题
Sqoop在抽取时可以指定-m的参数,但这个-m的参数是控制mapper的数量的,但它也决定了最后能够生成的文件的数目,调节这个值可以实现对结果文件大小的控制,但是,如果产生的文件的格式不能够被分割,那么对这个数据的下游性能有很大影响,同时Sqoop在启动时会启动-m个MapperTask,会对数据库产生m的并发读取,需要修改Sqoop的实现,合并多个Split到同一个Mapper中。
个人建议可以加个 -split-per-map 参数,比如设置-m=4 -split-per-map=2,则对结果集分 8 片,每个Mapper处理两片数据,最后共产生 8 个文件。

问题 2、分片效率低
Sqoop在做分片处理时有问题,其实现会使用Select Max(splitKey),Min(splitKey) From ( –select参数 ) as t1查询分片信息,在Mysql下,这样的查询会产生一个以split-id为主键的临时表,如果数据量不大,临时表数据可以在内存中,处理速度还可以保证。但如果数据量很大,内存中已经存放不下时,这些数据会被保存为MyISAM表存放到磁盘文件中,如果数据量再大一些,磁盘文件已经存放不下临时表时,拆分数据会失败。如果数据量大,即使没有查询也会很慢,大约会占用整个导出时间的45%,优化空间很大,如果不修改实现的话,不适合做大数据量表的全量数据导出操作。
解决方案一:
配置–boundary-query参数,指定使用的查询语句
解决方案二:

修改:
org.apache.sqoop.mapreduce.DataDrivenImportJob的

@Contract(“null, _ -> !null”)
private String buildBoundaryQuery(String col, String query)

修改代码如下

/**
   * Build the boundary query for the column of the result set created by
   * the given query.
   * @param col column name whose boundaries we're interested in.
   * @param query sub-query used to create the result set.
   * @return input boundary query as a string
   */
  private String buildBoundaryQuery(String col, String query) {
    if (col == null || options.getNumMappers() == 1) {
      return "";
    }

    // Replace table name with alias 't1' if column name is a fully
    // qualified name.  This is needed because "tableName"."columnName"
    // in the input boundary query causes a SQL syntax error in most dbs
    // including Oracle and MySQL.
    String alias = "t1";
    int dot = col.lastIndexOf('.');
    String qualifiedName = (dot == -1) ? col : alias + col.substring(dot);

    ConnManager mgr = getContext().getConnManager();
    String ret = mgr.getInputBoundsQuery(qualifiedName, query);
    if (ret != null) {
      return ret;
    }

//    return "SELECT MIN(" + qualifiedName + "), MAX(" + qualifiedName + ") "
//        + "FROM (" + query + ") AS " + alias;
    return initBoundaryQuery(qualifiedName, query, alias);
  }

  private String initBoundaryQuery(String qualifiedName, String query, String alias) {
    StringBuilder regex = new StringBuilder();
    regex.append("(\\s[A|a][S|s][\\s][`]?");
    for (char c : qualifiedName.toCharArray()) {
      regex.append('[').append(c).append(']');
    }
    regex.append("[`|\\s|,])");
    final Matcher matcher1 = Pattern.compile(regex.toString()).matcher(query);
    final boolean asCheckOk = !matcher1.find();
    if(asCheckOk) {
      final Matcher matcher2 = Pattern.compile("(\\s[F|f][R|r][O|o][M|m]\\s)").matcher(query);
      int count = 0;
      while (matcher2.find()) {
        count++;
      }
      boolean fromCheckOk = count == 1;
      if(fromCheckOk) {
        final Matcher matcher = Pattern.compile("(\\s[F|f][R|r][O|o][M|m]\\s[\\s\\S]*)").matcher(query);
        while (matcher.find()) {
          return "SELECT MIN(" + qualifiedName + "), MAX(" + qualifiedName + ") "
                  + matcher.group();
        }
      }
    }
    return "SELECT MIN(" + qualifiedName + "), MAX(" + qualifiedName + ") "
            + "FROM (" + query + ") AS " + alias;
  }

问题 3、对非整形和布尔型的字段分区可能有数据丢失风险
Sqoop实现分区数据替换时,没有使用Prepared statement来做,而是简单的在查询时会把 $CONDITIONS 替换成 split 的范围比如 ( id >= xxxx) && (id < yyyy),但当String进行分区时,得到的xxxx和yyyy有很大可能是乱码,然后就会引起查询问题,这个在使用非direct方式时,可以通过修改为Prepared Statement解决(未测试)。但在Direct方式下其导出数据使用的是mysqldump方式,通过命令行传递查询参数,就无法解决了。(其实可以修改它的Split算法,使得范围区间不会产生乱码),所以建议不要使用非整形的列做拆分列。

问题4、Mysql中数据影响导出
Mysql在的timestamp列允许 “0000:00:00 00:00:00″ 这样的数据存储,在JDBC的实现中,Timestamp的格式是会被转化为java.sql.Timestamp的对象的,但java.sql.Timestamp对象无法表示 “0000:00:00 00:00:00″,所以在调用java.sql.Timestamp getTimestamp(int columnIndex) throws SQLException;这个方法时 SQLException 会被抛出来,Sqoop的JDBC方式导出数据到HDFS的实现就是采用这个方法去读取Timestamp的数据,当数据中出现这样的时间存储时,就直接抛出了SQLException异常,这个异常没有被捕获,导致整个导出失败。
我们可以在Sqoop做相应的修改,让它避免抛出异常,使任务可以执行下去。

// 代码在
// org.apache.sqoop.orm.ClassWriter private void myGenerateDbRead(Map<String, Integer> columnTypes,
//                               String[] colNames,
//                               StringBuilder sb,
//                               int methodNumber,
//                               int size,
//                               boolean wrapInMethod)

...
      if("java.sql.Timestamp".equals(javaType)) {
        sb.append("    try {\n");
      }

      sb.append("        this." + col + " = JdbcWritableBridge." +  getterMethod
          + "(" + (i + 1) + ", __dbResults);\n");

      if("java.sql.Timestamp".equals(javaType)) {
        sb.append("    } catch (SQLException e) {\n    this." + col + " = null;\n    }");
      }
...

问题4、Sqoop导出时数据中特殊字符的替换
Sqoop抽取时可以对Hive默认的分隔符做替换,它们是\n \r \01,可以使用 –hive-drop-import-delims做替换,但是它的实现是写死的,如果我们采用的不是Hive默认的分隔符,那么它就不会做相应的替换操作,在Hive中很多人习惯使用\t做列分隔,因为mysql的客户端导出文本默认就是以\t导出的,Sqoop不会对这个数据进行替换。
有两种方法可以解决这个问题。

方法1:修改Sqoop实现,代码在 org.apache.sqoop.lib.FieldFormatter
方法2:由Mysql做替换,Sql语句可以写为: replace(colname, “\t”, “”) as colname

问题5、sqoop导入mysql数据出错
这个是由于mysql-connector-java的bug造成的,出错时我用的是mysql-connector-java-5.1.10-bin.jar,更新成mysql-connector-java-5.1.32-bin.jar就可以了。mysql-connector-java-5.1.32-bin.jar的下载地址为http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.32.tar.gz。下载完后解压,在解压的目录下可以找到mysql-connector-java-5.1.32-bin.jar
报错信息如下

14/12/03 16:37:58 ERROR manager.SqlManager: Error reading from database: java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@54b0a583 is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.

java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@54b0a583 is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.
...
14/12/03 16:37:58 ERROR tool.ImportTool: Encountered IOException running import job: java.io.IOException: No columns to generate for ClassWriter
...

Understanding Indexing Without Needing to Understand Data Structures

Understanding Indexing

Without Needing to Understand Data Structures
MySQL UC 2011 – April 12, 2011
Zardosht Kasheff

什么是表?

(key,value)对集合的词典

1、确保你可以修改这个词典(插入、删除、修改)和查询(点查询、范围查询)词典
2、B-Tree和Fractal Tree是两个词典的例子
3、哈希则不是(不支持范围查询)
例子:
CREATE TABLE foo (a INT, b INT, c INT, PRIMARY KEY(a));
然后我们插入一批数据

a b c
100 5 45
101 92 2
156 56 45
165 6 2
198 202 56
206 23 252
256 56 2
412 43 45

一个key定义了词典的排序规则

1、对于数据结构和存储引擎我们会认为,在排序上进行范围查询是快速
2、在其它顺序上的范围查询会进行表的扫描,这个操作是很慢
3、点查询需要检索一个特定的值也是
一个点查询是快速的,但是读取一批行用这种方法会比使用按顺序的范围查询慢2个数量级

词典T上的索引I也是一个词典

1、同样我们需要定义一个(key,value)对
2、索引上的key是主词典上的列的子集
3、索引I的值是T的主KEY
还有其它的方法去定义这个值,但我们还是坚持使用T的主KEY来定义这个值

例子:
ALTER TABLE foo ADD KEY(b);
然后我们得到

Primary

a b c
100 5 45
101 92 2
156 56 45
165 6 2
198 202 56
206 23 252
256 56 2
412 43 45
Key(b)

b a
5 100
6 165
23 206
43 412
56 156
56 256
92 101
202 198

问题:COUNT(*) WHERE a<120;

100 5
101 92 2
=>
2

问题:COUNT(*) WHERE b>50;

56 156
56 256
92 101
202 198
=>
4

问题:SUM(c) WHERE b >50;

56 156
56 256
92 101
202 198
=>

156 56 45
256 56 2
101 92 2
198 202 56
=>
105

索引的好处?

1、索引使得查询变得快速
  索引会提升部分查询请求的速度
2、需要在心里根据查询设计索引
  选出最重要的查询给它们设计索引
  考虑索引本身的代价

选出最重要的查询给它们设计索引

设计一个好的索引有3个规则可以参考

避免任何数据结构的细节
  B树和分形树对于计算机科学家来说是有趣和好玩的,但这三个规则同样适用于其它数据结构
  所有我们需要考虑的是范围查询是快速的(对每行)而点查询则慢的多(对行)
世界上没有绝对的规则
  索引像是一个数据问题
  规则有帮助,但每个方案都有自己的问题,需要分析解决问题的方法
也就是说规则有很大的帮助

三个规则

1、查询较少的数据
  少的带宽,少的处理…
2、减少点查询
  数据访问成本是不一样的
  顺序访问数据比无序访问快的多
3、避免排序
  GROUP BY和ORDER BY查询需要后期的检索工作
  索引在这种查询中可以帮助获得RowID

我们来分别看下这三种规则

规则1:查询较少的数据

规则1:慢查询的例子
表:(100万行数据,没有索引)
  CREATE TABLE foo (a INT, b INT, c INT);
查询(匹配1000行数据)
  SELECT SUM(c) FROM foo WHERE b=10 and a<150;
查询计划:
  行b=10 AND a<150可以在表的任何地方
  没有索引的帮助,整个表都会被扫描
执行速度慢:
  检索100万行数据仅仅查询1000行数据

规则1:如何去添加一个索引
我们应该怎么去做?
  减少检索到的数据
  分析远少于100万的数据行
怎么做(对于一个简单的查询)?
  设计索引时着眼于WHERE子句
    由查询关注那些行来决定
    其它行的数据对于这个查询来说并不重要

对于查询 SELECT SUM(c) FROM foo WHERE b=10 and a<150;
规则1:那个索引?
选择1:Key(a)
选择2:Key(b)
那个更好?由select来决定:
  如果WHERE a<150的数据行更少,key(a)更好
  如果WHERE b=10的数据更少,key(b)更好

选择3:key(a) AND key(b),然后MERGE
  我们稍后会进行讨论

规则1:选择最好的索引

key(a)与key(b)都不是最佳的
考虑:
  WHERE a<150有20万行数据
  WHERE b=10有10万行数据
  WHERE b=10 AND a<150有1000行数据
然后key(a)和key(b)都会检查很多的数据

为了获得更好的性能,索引必须尝试尽量优化WHERE条件
  我们需要复合索引

复合索引可以减少数据检索
WHERE条件:b=5 AND a<150
  选择1:key(a, b)
  选择2:key(b, a)
问题又来了那种选择更好?
  Key(b, a)!
索引规则:
  当创建一个复合索引,需要对每个列进行检查,条件b是相等判断,但在a上不是。

问题:WHERE b=5 and a>150;

a b c
100 5 45
101 6 2
156 5 45
165 6 2
198 6 56
206 5 252
256 5 2
412 6 45
b,a a
5,100 100
5,156 156
5,206 206
5,256 256
6,101 101
6,165 165
6,198 198
6,412 412

复合索引:没有平等的条件
如果WHERE条件是这样的:
  WHERE a>100 AND a<200 AND b>100;
那个更好?
  key(a),key(b),key(a,b),key(b,a)?
索引规则:
  只要复合索引不被用于相等查询,复合索引的其它部分不会减少数据检索量
    key(a,b)不再比key(a)好
    key(b,a)不再比key(b)好

问题:WHERE b>=5 AND a>150;

a b c
100 5 45
101 6 2
156 5 45
165 6 2
198 6 56
206 5 252
256 5 2
412 6 45
b,a a
5,100 100
5,156 156
5,206 206
5,256 256
6,101 101
6,165 165
6,198 198
6,412 412
=>
5,156 156
5,206 206
5,256 256
6,101 101
6,165 165
6,198 198
6,412 412

复合索引:另一个例子
WHERE条件:b=5 AND c=100
key(b,a,c)和key(b)一样好,因为a没有在条件中使用,所以在索引中包含c并不会有帮助,key(b,c,a)会更好

a b c
100 5 100
101 6 200
156 5 200
165 6 100
198 6 100
206 5 200
256 5 100
412 6 100
5,100,100 100
5,156,200 156
5,206,200 206
5,256,100 256
6,101,200 101
6,165,100 165
6,198,100 198
6,412,100 412
=>
5,100,100 100
5,156,200 156
5,206,200 206
5,256,100 256

规则1:总结
  根据查询条件设计复合索引
  把相等条件的查询列放在复合索引的开始位置
  保证第一个非相等条件的列在索引中越有选择性越好
  如果复合索引的第一个列没有被相等条件来使用,或者没有在条件中使用,复合索引的其它列对于减少数据的检索没有帮助
    是否就表明它们是无用的呢?
    它们在规则2中可能有用

规则2:避免点查询

表:
  CREATE TABLE foo (a INT,b INT,c INT,PRIMARY KEY(a), KEY(b);
查询:
  SELECT SUM(c) FROM foo WHERE b>50;
查询计划:使用key(b)
  因为随机的点查询的原因,对每个行都进行检索的代价很大

问题:SUM(c) WHERE b>50;

a b c
100 5 45
101 92 2
156 56 45
165 6 2
198 202 56
206 23 252
256 56 2
412 43 45
b a
5 100
6 165
23 206
43 412
56 156
56 256
92 101
202 198
56 156
56 256
92 101
202 198
=>
156 56 45
256 56 2
101 92 2
198 202 56
=>
105

还是这张表,但查询计划不同了
查询计划:扫描主表
  每行的检索成本低
  但是需要检索很多行

问题:SUM(c) WEHRE b>50;

a b c
100 5 45
101 92 2
156 56 45
165 6 2
198 202 56
206 23 252
256 56 2
412 43 45
=>
105

如果我们添加了另一个索引会怎么样?
  如果添加key(b, c)呢?
  由于我们在b上有索引,我们只检索我们需要的行
  由于索引包含C的信息,我们不再需要再去检索主表了。没有点查询了

覆盖索引:索引覆盖一个查询,如果这个索引包含足够的信息来回答这个查询。
例子:
问:SELECT SUM(c) FROM foo WHERE b<100;
问:SELECT SUM(b) FROM foo WHERE b<100;
索引:
key(b, c): 对第一个查询是覆盖索引
key(b, d):对第二个查询是覆盖索引
key(b, c, d):对每个索引都是覆盖索引

如何去构建一个覆盖索引
把检索的每个列都包含进去,并不仅仅是查询条件
问:SELECT c,d FROM foo WHERE a=10 AND b=100;
错误:ADD INDEX(a, b)
  并不是覆盖索引。仍然需要点查询去检索c和d的值
正确:ADD INDEX(a, b, c, d)
  包含所有相关的列
  按照规则1规定把a和b放在索引开始位置

如果主键匹配WHERE条件呢?
问题:SELECT sum(c) FROM foo WHERE b>100 AND b<200;
SCHEMA:CREATE table foo (a INT, b INT, c INT, ai INT AUTO_INCREMENT, PRIMARY key(b, ai));
  查询在主词典上做了范围查询
  只有一个词典会按顺序访问到
  这个查询很快
主键覆盖所有查询
  如果排序规则匹配查询条件,问题得到解决

什么是聚簇索引
如果主键不匹配查询条件呢?
  理想的情况下,必须确保辅助索引包含所有列
  存储引擎不会让你去这样做
  有一个例外。。。。TokuDB可以
  TokuDB允许你定义任何聚簇索引
  一个聚簇索引包含所有的查询,就像主键做的一样

聚簇索引的实践
  问:SELECT SUM(c) FROM foo WHERE b<100;
  问:SELECT SUM(b) FROM foo WEHRE b>200;
  问:SELECT c,e FROM foo WEHRE b=1000;
  索引:
  key(b, c):第一个查询
  key(b, d):第二个查询
  key(b, c, e):第一个和第三个查询
  key(b, c, d, e):所有的三个查询

索引需要大量的查询分析
考虑那个会涵盖所有的查询:聚簇索引 b
聚簇索引可以让你更加关注于查询条件
  它们减少了点查询并且使查询更加快

聚簇索引更多的信息:索引合并
例子:
CREATE TABLE foo (a INT, b INT, c INT);
SELECT SUM(c) FROM foo WHERE b=10 AND a<150;
假设:
a<150有20万行数据
b=10有10万行数据
b=10 AND a<150有1000行数据
如果我们使用key(a)和key(b)然后把结果集合合并会怎样?
合并计划:
  查询20万行数据从key(a)中,where a<150
  查询10万行数据从key(b)中,where b=10
  合并结果集合,然后找到查询标识的1000行数据
  执行1000行数据的点查询去得到c
这比没有索引好一点
  和没有索引相比,减少了扫描行的数量
  和没有合并相比,减少了点查询的数量

那么聚簇索引会对合并有帮助么?
考虑key(a)是个聚簇索引
查询计划:
  扫描key(a)20万行数据,where a<150
  扫描结果集合得到b=10的结果
  使用得到的1000行的数据去检索C的值
一次得到,没有点查询
更好的选择还有没有?
  聚簇索引(b, a)!

规则2总结:
  避免点查询
  确保索引覆盖查询
    包含查询涉及到的所有列,并不仅仅是查询条件中的
  使用聚簇索引
    使用聚簇索引包含所有查询
    允许用户把关注点集中在查询条件上
    给多个查询提速,包括还没有预见到的查询--简化数据库设计

规则3:避免排序

简单的查询不需要后续的处理
  select * from foo where b=100;
仅仅取得数据然后返回给用户
复杂的查询需要对数据进行后续的处理
  GROUP BY 和 ORDER BY 会排序数据
选择正确的索引可以避免这些排序的步骤

考虑:
问:SELECT COUNT(c) FROM foo;
问:SELECT COUNT(c) FROM foo GROUP BY b, ORDER BY b;
查询计划1:
  当进行表扫描时,给C计数
查询计划2:
  扫描表把数据写到临时表中
  对临时表按B进行排序
  重新扫描排序后的数据,对每个b,使用c进行计数
如果我们使用key(b, c)会怎么样呢?
  通过添加国所有需要的字段,我们覆盖了查询。  快速
  通过提前对B进行排序,我们避免了排序 快速

总结:
  通过给GROUP BY或者ORDER BY使用预先排序了的索引

把它们都放到一起:

简单查询
  SELECT COUNT(*) FROM foo WHERE c=5, ORDER BY b;
  key(c, b):
    把c放在索引第一个位置去减少行检索 R1
    然后通过剩余的行排序去避免排序 R3
    因为相等的检查在c上,所以剩余的行数据会被按b排好序
  SELECT SUM(d) FROM foo WHERE c=100, GROUP BY b;
key(c, b, d):
    c在索引的第一个位置可以减少行数据的检索 R1
    然后其它的数据在b上排好序去避免查询 R3
    确保了查询覆盖所有的查询,避免了点查询 R2

在一些情况下,差没有明确的答案
  最优的索引是和数据相关的
问:SELECT COUNT(*) FROM foo WHERE c<100, GROUP BY b;
索引:
  key(c, b)
  key(b, c)
key(c, b)的查询计划:
  使用c<100对数据进行过滤
  仍然需要对数据进行排序
    *检索的行不会被b进行排序
    *查询条件不需要对c进行相等的检查,因此b的值分布在不同的c值块中
key(b, c)的查询计划
  按b进行排序,R3
  列WHERE c>=100同样需要处理,因此没有R1的优势

那个更好一些呢?
  答案依赖于数据是什么样的
  如果c>=100有更多的数据,节省时间不去检索无用的行。使用key(c, b)
  如果c>=100没有多少行,执行查询的时间是以排序为主,那么使用key(b, c)

问题的关键是,通常情况下,规则会有帮助,但它通常只是帮助我们去思考查询和索引,而不是给我们一个配方。

另一个重要的问题:为什么不把所有的加载都添加上索引呢?

需要跟上插入的负荷
更多的索引 = 更小的负荷

索引的代价:
空间:
 问题,每个索引都会增加存储的需求
 选项,使用压缩
性能:
 问题,B-trees在某些索引任务中执行的很快(内存中,顺序的key),但是在其它类型的索引(辅助索引)会慢20倍
 选项,分形树索引对于所有的索引类型都很快速,可以让消费者很容易索引,经常索引
范围查询性能:
 问题,规则2依赖于范围查询足够快
 问题,B-tree会比较容易碎片化(删除、随机插入…),碎片化的B-tee在范围查询上会变慢
 选项,对于B-tee可以优化表,导出然后导入(时间和离线)
 选项,对于Fractal Tree索引,不是问题,它不会碎片化

PS. 文档翻译自:
Understanding Indexing
Without Needing to Understand Data Structures
MySQL UC 2011 – April 12, 2011
Zardosht Kasheff

For more information…
• Please contact me at zardosht@tokutek.com for any
thoughts or feedback
• Please visit Tokutek.com for a copy of this presentation
to learn more about the power of indexing, read about
Fractal Tree indexes, or to download a free eval copy of
TokuDB

PS. 正确设计数据库索引并不只是DBA的事情,每个合格开发者都必须具备这样的能力,但最近老是发现乱使用索引和不使用索引的情况,很明显是不理解索引,希望这篇译文能够帮助到这部分开发者吧。另外还有一篇不错的文章,有时间也会翻译出来,敬请期待。

Python操作

操作json

dump和dumps的区别是dump会生成一个类文件对象,dumps会生成字符串
load和loads区别是load解析类文件对象,loads解析字符串格式的JSON

把json object的value由字符串改为[],load与loads,dump与dumps的区别

import json

def changejson(jstr):
    sdict = json.loads(jstr)
    # print sdict
    rdict = {}
    for k in sdict:
        item = []
        item.append(sdict[k])
        rdict[k] = item
    rjson = json.dumps(rdict)
    return rjson

print changejson('{}')

操作文件

# example 1:
t_file=open("/xxxx/xxx.txt", "r")
while 1:
    sline = t_file.readline().strip('\n')
    if not sline:
        break
    print "The Line is: %s" % (sline)
t_file.close()

# example 2:
# split file and hash with one filed
def splitfile(sfile, index, pre):
    f=open(sfile)
    files=[]
    for i in range(0, 64):
        files.append(open("/tmp/tmp/%s_%s" % (pre, i), "w"))

    while 1:
        line = f.readline().strip('\n')
        if not line:
            break
        row = line.split('\t')
        tid = row[index]
        htid = hash(tid)
        if (htid <= 0):
            htid = 0 - htid
        files[htid % 64].write("%s\n" %  (line))

    f.close()
    for i in range(0, 64):
        files[i].close()

#f_split("/tmp/tmp/jiaguo20150108_cpalog.dat", 1, "detail")

操作数据库

安装数据连接池模块DBUtils
# tar zxvf DBUtils-0.9.4.tar.gz
# cd DBUtils-0.9.4/
# python setup.py install

编写python脚本 Test.py

#!/usr/bin/python

import MySQLdb
from DBUtils.PooledDB import PooledDB

# 创建连接池
dbpool = PooledDB(creator = MySQLdb, maxusage=100, host="localhost",user="user",passwd="passwd",db="db",port=3306, charset="utf8")

# 使用
def findFromDb(sql):
    try:
        conn = dbpool.connection()
        cur = conn.cursor()
        count = cur.execute(sql)
        if (count > 0):
            # results = cur.fetchall()
            # for result in results:
                # ......
        cor.close()
        conn.close()
    except MySQLdb.Error,e:
        print "Mysql Error %d: %s" % (e.args[0], e.args[1])
        result = {}
        return result

print findFromDb("......")

JMX连接池

jmx连接断开后无法直接连接,而在每次使用都创建的话性能会比较差,我们可以使用apache的pool组件,它有检测连接的功能,当每次从连接池中请求连接后它会调用validateObject方法,检测这个连接是否正常,如果不正常的话会再调用makeObject方法生成新的连接,同时丢掉断开的连接。

我们先需要创建一个连接工厂,代码如下:

class JMXConnectionFactory
  extends KeyedPooledObjectFactory[String, JMXConnector] {

  @Override
  def makeObject(config: String): PooledObject[JMXConnector] = {
    val json: JsonObject = new JsonParser().parse(config).getAsJsonObject
    val host = json.getAsJsonPrimitive("host").getAsString
    val port = json.getAsJsonPrimitive("port").getAsString
    val url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi".format(host, port))
    val environment = Maps.newHashMap[String, Array[String]]()
    if (json.has("username") && json.has("password")) {
      val credentials = Array(json.getAsJsonPrimitive("username").getAsString,
                              json.getAsJsonPrimitive("password").getAsString)
      environment.put(JMXConnector.CREDENTIALS, credentials)
    }
    val connect = JMXConnectorFactory.connect(url, environment)
    new DefaultPooledObject[JMXConnector](connect)
  }

  @Override
  def destroyObject(key: String, obj: PooledObject[JMXConnector]) = obj.getObject.close()

  @Override
  def validateObject(key: String, obj: PooledObject[JMXConnector]): Boolean = {
    var ret = false
    try {
      obj.getObject.getConnectionId
      ret = true
    } catch {
      case e: Throwable => {}
    }
    ret
  }

  @Override
  def activateObject(key: String, p: PooledObject[JMXConnector]) = {}

  @Override
  def passivateObject(key: String, p: PooledObject[JMXConnector]) = {}
}

然后需要创建一个pool

private val jmxPool = new GenericKeyedObjectPool[String, JMXConnector](new JMXConnectionFactory)
jmxPool.setTestOnBorrow(true)
jmxPool.setMaxIdlePerKey(-1)
jmxPool.setMinIdlePerKey(-1)
jmxPool.setMaxTotalPerKey(5)
jmxPool.setMaxTotal(100)
jmxPool.setTimeBetweenEvictionRunsMillis(1000 * 60 * 5)
jmxPool.setMinEvictableIdleTimeMillis(1000 * 60 * 5)

在gradle配置中需要添加相关的依赖

...
compile 'org.apache.commons:commons-pool2:2.2'
compile 'com.google.code.gson:gson:2.2.4'
...

Apache Kafka-0.8.1.1源码编译问题

使用Kafka里面自带的脚本进行编译
Kafka源码,里面带了gradlew的脚本,我们可以使用它编译Kafka源码:

$ git clone http://git-wip-us.apache.org/repos/asf/kafka.git
$ git checkout -b 0.8.1 origin/0.8.1
$ ./gradlew releaseTarGz

在运行这个命令之后,会报如下的错误:

FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':core:signArchives'.
> Cannot perform signing task ':core:signArchives' because it has no configured signatory

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output.

BUILD FAILED

网上都说使用下面命令执行可以正常打包,但试过了不行。

./gradlew releaseTarGzAll -x signArchives

其实可以在build.gradle文件signing块里面添加

required = false

修改后文件为

...
uploadArchives {
    repositories {
      signing {
        required false
        sign configurations.archives
...

关于这个配置的含义文档中有描述

Whether or not this task should fail if no signatory or signature type are configured at generation time. If required is a Callable, it will be stored and "called" on demand (i.e. when isRequired() is called) and the return value will be interpreting according to the Groovy Truth. For example:

 signing {
   required = { gradle.taskGraph.hasTask("uploadArchives") }
 }
 
Because the task graph is not known until Gradle starts executing, we must use defer the decision. We can do this via using a Closure (which is a Callable). For any other type, the value will be stored and evaluated on demand according to the Groovy Truth.
 signing {
   required = false
 }

配置上这个参数就可以正常打包了。下面是打包输出的信息。

/data/bin/jdk1.7.0_45/jre/bin/java -Dgradle.home=/home/jiaguotian/.gradle/wrapper/dists/gradle-1.6-bin/72srdo3a5eb3bic159kar72vok/gradle-1.6 -Dtools.jar=/data/bin/jdk1.7.0_45/jre/lib/tools.jar -Didea.launcher.port=7535 -Didea.launcher.bin.path=/data/app/idea-IC-129.1525/bin -Dfile.encoding=UTF-8 -classpath ..... 25/lib/idea_rt.jar com.intellij.rt.execution.application.AppMain org.gradle.launcher.GradleMain --build-file /home/jiaguotian/workspace/kafka-apache/build.gradle releaseTarGz_2_10_1
The TaskContainer.add() method has been deprecated and is scheduled to be removed in Gradle 2.0. Please use the create() method instead.
Building project 'core' with Scala version 2.10.1
Building project 'perf' with Scala version 2.10.1
Building project 'campaign:queue-kafka' with Scala version 2.10.1
Building project 'campaign:monitor-kafka' with Scala version 2.10.1
Building project 'campaign:campaign-examples' with Scala version 2.10.1
:releaseTarGz_2_10_1
Building project 'core' with Scala version 2.10.1
Building project 'perf' with Scala version 2.10.1
Building project 'campaign:queue-kafka' with Scala version 2.10.1
Building project 'campaign:monitor-kafka' with Scala version 2.10.1
Building project 'campaign:campaign-examples' with Scala version 2.10.1
:kafka-apache:core:compileJava UP-TO-DATE
:kafka-apache:core:compileScala UP-TO-DATE
:kafka-apache:core:processResources UP-TO-DATE
:kafka-apache:core:classes UP-TO-DATE
:kafka-apache:core:copyDependantLibs UP-TO-DATE
:kafka-apache:core:jar UP-TO-DATE
:kafka-apache:core:javadoc UP-TO-DATE
:kafka-apache:core:javadocJar UP-TO-DATE
:kafka-apache:core:scaladoc UP-TO-DATE
:kafka-apache:core:scaladocJar UP-TO-DATE
:kafka-apache:core:srcJar UP-TO-DATE
:kafka-apache:core:compileTestJava UP-TO-DATE
:kafka-apache:core:compileTestScala UP-TO-DATE
:kafka-apache:core:processTestResources UP-TO-DATE
:kafka-apache:core:testClasses UP-TO-DATE
:kafka-apache:core:testJar UP-TO-DATE
:kafka-apache:core:signArchives SKIPPED
:kafka-apache:core:releaseTarGz

BUILD SUCCESSFUL

递归

递归是算法是简洁、优美、强大的,递归简单地说就是把自身越来越小的形式表示和解决。

递归计算机中是如何实现的呢?

我们需要对函数调用做些了解。一个可执行文件被加载到虚拟存储器中, 其中的数据由以下几个区域组成,程序和代码(代码段),堆,共享库和静态数据区,栈。当进程调用一个函数时,就需要在栈中保存和这个调用相关的信息。栈上的这部分空间我们称为栈帧。栈帧中包括输入参数、返回值的空间、计算使用到的临时空间(比如c语言函数内部使用到的变量,在函数返回后,因为栈帧被弹出而自动清除),调用函数保存的状态信息及输出参数等。一个方法的输出参数会成为下一个调用函数的输入参数。栈帧在调用函数时压入栈中,当函数返回时才从栈中弹出。

有上面的描述我们可以推出递归函数是如何执行的,递归递归,递推加回归,在递推过程时函数通过不断调用自己来执行,然后当其中的调用满足结束条件时,递推阶段结束,开始回归操作,在回归操作中,其以递推逆序的方式执行,直到最初的函数返回。

可以看到每次调用自身的时候都需要一个栈帧来保存现场,这会占用大量的空间来存储他们,同时也因为大量信息的保存和恢复,生成和销毁他们也要花费大量时间。

有没有办法可以消除这个缺点呢?有的,如果一个函数所有递归调用都出现在函数末尾,那么我们称其为尾递归。

尾递归可以替换成等价的迭代实现,不会改变程序结果,但可以节省函数调用的开销。现代的编译器通常能够检测到尾递归,当检测到时,其会覆盖当前栈帧,而不重新创建一个,这样空间和时间上都节省不少。

我们再考虑一种情况,如果递归函数的终止条件永远得不到满足,最后程序栈会增加到超过系统可以接受的最大限度,产生栈溢出而终止执行。

另外在很多系统实现时都会把递归的算法改成迭代的算法,也是出于对函数执行效率的考虑,比如数据库最重要的算法二分查找算法,快速排序算法,都是采用的迭代算法。

但是递归还是非常有用处的,递归的算法比较容易理解算法本身,所以在许多数据结构和算法的书中都会有以递归算法来讲解算法实现。

Vertica简要

数据分析工作负载VS事务性工作负载
证明商业和技术可靠的大规模分布式数据库,支持ACID、有效存储PB级别数据的系统

分析数据库的架构,关注与C-Store不同的点
实施和部署的经验,为什么引入那些不同
现实世界的经验可以引导未来大规模数据分析系统研究的方向

2.背景
2.1.1设计目标:分析工作负载不是事务工作负载
事务工作负载:每秒事务数多,每个事务会影响少数几个元组,大部分的事务是增加一个新行或者修改部分已存在的列
分析工作负载:每秒事务数少,每个事务检查很多表中的元组,比如分析用户的行为,行处理每秒很多

数据量级的增加,即使是小公司

最初就是设计一个分布式数据库
1、新节点加入系统性能有线性的扩展。使用共享磁盘的架构也能获得这样的扩展,但这很快会成为系统的瓶颈
2、优化和执行的引擎避免大量的网络数据传输,以避免内部互联成为系统瓶颈
3、查询和加载数据每秒都会有很多,必须关注支持高插入,否则只能有限的应用,批量的加载应该很快而又不能影响并行的查询
4、操作可在线,管理和维护任务不应该停止或暂停查询
5、易于使用是一个明确的目标。用CPU换时间,形式上减少复杂的网络和磁盘设置,减少性能的调优,自动化的物理设计和管理

3数据模型
3.1 (列)计划
属性排序子集(经过编码和压缩并按某种次序排序和分割的纵列集合)
加载过程中自动维护
每个Projection都有自己的排序,数据是完全排序的
任意数量、带有不同排序的推算或表列的子集是被允许的,可针对不同的查询进行优化。

不同排序规则,可被看成物化视图,但它们是物理数据结构,不是辅助索引。它不包含聚集、连接、额外的查询,认为它们在现实的分布式数据库中是不切实际的。

2.2连接索引,没有被实现,代价比带来的优势要少,实现复杂,执行代价在重现全元组在分布式查询时很高。
明确存储行ID,会点用很多空间,我们高效的压缩实现有助于减少这种开销,但没有计划实现它
3.3预连接推算
没有预期使用的多和重要:在小表连接操作上已经够好(高度优化的hash和合并算法),用户一般不愿意减慢
批量数据加载数据去优化查询速度。在加载数据时比连接查询时能够进行的优化机会要少,因为数据库在加载流中没有什么先验。

3.4 编码和压缩
不同列有不同的编码,同样的列在他们的每个推测中也可有不同的编码
自动:根据类型和配置,当使用场景不同确时
替换:低基数列

3.5 分区
c-store节点内水平分区,在单个节点使用并行提升性能
获得节点性并行并不需要硬盘物理分隔,在运行时逻辑分区,然后并行处理。尽管能够自动并行化,也提供根据value保持在物理上隔离
分区原因:快速批量删除(其它系统分区也是如此),否则需要查询所有物理文件要删除的行,添加删除标记,比起直接删文件慢的多
增加了存储的需求,在元组没有执行合并操作前影响查询性能。如果分区在所有推算上都一致,批量删除才是快速的,所以它是表层次的,而不是推算层次的

分区原因:增加查询性能,它保存最小值和最大值在每个ROS中,在做计划时就能够修剪容器。分区使这种技术更高效,分区使列数据不混合

3.6 分割:集群分布
可以指定列做hash到段
c-store根据projection的排序字段的第一个列分割物理存储到段
完全的分布式存储系统,分配存储元组到不同计算结点
节点内水平分区和节点外水平分区(分割)
分割对每个推算在排序方式上可能不同,推算分割提供决定把元组映射到节点,可以做许多重要的优化。(
完全本地分式连接,高效的分布式聚合,计算高基数不同的集合特别高效
复制推算在每个node上分布,分割推算一个元组在单个推算节点上存储
保持元组物理隔离为本地段,以方便集群的在线扩展与收缩

3.7读写优化存储
写在内存中,读在磁盘中
读:完整的依据推算排序排序的元组,列存储为一系列文件,两个文件1真实列数据2列数据索引
(大约1/1000,包含MetaData,如,开始位置,最大,最小值,固定不变故没有采用B-Tree)
写:内存,行列格式无关紧要,无编码和压缩,为缓冲数据操作(增、删、改)保证操作有足够多的行数以减少写的代价,
会随时间在行列模式下切换(和性能无关,只是软件工程考虑)
但它会以推算的分段表达式进行分段
3.7.1数据修改删除象量
不直接修改,当U和D操作时,创建D象量,它是要删除行的位置的列表,可能会有多个
DVWOS-》DVROS 高效的压缩
修改=删除+插入

4、无组移动:提高数据存储和查询效率
异步把数据从写优化到读优化,当WOS饱和mover操作没有完成之前,加载的数据直接入ROS,直到WOS重新获得足够的能力。mover平衡工作,避免产生小的ROS
读优化文件合并,减少ROS文件数量
写填满-》自动开始移动操作(随后的数据直接写到读优化,直到写有足够的空间)-》移动操作不能过多也不能过少
小文件影响压缩、减慢查询,需要更多文件句柄、seek和更多的全并排序文件的操作。
合并操作:回收已标记为删除的元组
没有限制ROS Containers大小,但它不会创建超过一定大小的(当前2T),足够大(每文件开销分摊,管理不笨重)
ROS层在Node间是私有的,不会在集群内协调

5、修改和事务
每个元组都和提交时间关联(纪元)(隐式64bit的列或删除象量)
所有节点在事务提交都同意其包含的纪元,就相当于有了全局的一致性快照,读无需要加锁
有一个分析工作量的表锁定模式
不使用传统的两阶段提交
共享锁:限制并发修改表,用来实现序列化隔离
插入锁:插入数据时到表使用,和自己兼容,支持匹量插入和加载同时发生,以保持高插入加载速度。但仍然提供事务语义
共享插入锁:读写
独占锁:删改
元组移动锁:和所有锁兼容除了X锁,在元组移动同时操作删除象量时
使用锁:
拥有锁:

分布式和组成员协议来协调集群内结点之间操作,使用广播和点对点来保证所有控制消息成功送达所有node
提交在集群中成功如果它在选定数据的结点上成功,ROS与WOS创建的事务完成后其它事务才能看到

5.1纪元管理
纪元模型:纪元包含给定时间窗口所有提交了的事务
Last Good Epoch:所有数据都成功从WOS移动到ROS
Ancient History Mark:

5.2宽容失败
Vertica的数据复制通过使用Projection分段机制,以提供容错
每个Projection,必须至少有一个伙伴projection含有相同的列和分割以确保任何行被存储在同一样节点的Projection,当节点故障,伙伴projection会做为故障节点的源。
不需要传统的事务日志,数据+纪元自己就做为过去系统活动日志。vertica使用这些历史数据,去重放节点丢失的DML。节点会从正常的伙伴projection数据段恢复数据,

恢复,更新,重新平衡和备份都是在线操作;在执行这些操作同时可进行数据的加载和查询。影响的只是计算和带宽资源。

5.3
以元数据目录在节点间管理状态,它记录着表、用户、节点、纪元等的信息。
没有保存在数据库表中,因为它的表设计无法恰当的通过catalog访问和修改。它采用自定义的内存结构中,然后通过它自己的事务机制保存到磁盘中。
k-safety:当有小于或等于K个节点故障,集群仍然可用。
数据库projection必须确保有K+1个每个段的拷贝在不同的节点上。当有一半节点故障,集群会保护性关闭。集群必须保证有n/2+1个节点确保脑裂后的两个集群继续提供服务。

6 查询的执行
私有扩展标准的SQL声明的查询语言。
Vertica的公司扩展设计使可轻松地查询在SQL时过于繁琐或不可能的时间序列和日志型数据。

6.1查询操作符和计划格式
执行引擎为完全天量化,并且在同时查询一块行数据而不是同时查询单一行
标准操作树,每个操作执行一个特定的算法。一个操作者的输出做为下面操作者的输入。
执行引擎是多线程和流水线的
在一个时间请求一块行数据而不是请求一条数据。
使用上拉处理模型,直到一个操作从磁盘或网络读到数据。
扫描、分组(有多个算法依据性能最大化、内存需求、操作是否必须产生单独的组决定使用那个,并且实现了管道聚合方式可选择是否保持数据编码)、连接(实现了hash join和merge join算法,能够在必要时外部化)、表达式执行、排序、分析、发送/接收

特别处理和复杂的实现,确保可直接操作未解码的数据,这对于扫描、joins、低级聚集操作很重要。

SIP:Sideways Infomation Passing,侧面消息传递,采用在查询计划中尽可能早的过滤数据来优化join性能。(可在优化查询计算时就构建SIP filter,然后在执行时使用。在执行时,扫描操作会扫描join的hash table,SIP会用来判断outer key值在hash 表中是否存在。)

运行期间分析数据调整算法(内存不够hashtable->sort-merge join;生成预处理操作,并发执行,最后根据它们的结果生成最终数据)

晚物化、压缩的成本和估算、流聚合、消除排序、合并连接、不可见索引(每列按选择性排序,不需要索引来减少IO和更快查找值),数据die dai(L2缓存,更好的利用二级缓存和多核并发的特性)

使用pipeline执行引擎带来挑战是共享公共资源,可能造成不必要的yi chu到磁盘。vartica会把计划分成多个不会在同一时间执行的区,下游操作会回收上游操作的资源。

不同表的projection的数据被复制到所有节点上或者在join key上分割相同的范围,使得计划可以在每个节点的内部执行,然后结果发送到客户端连接的结点。

6.2 优化

选择和连接projections,保证scan和join更快(保证之后join的数据被减少)

收集性能数据(压缩I/O,CPU,网络使用),

ToKuDB简要

介绍
TokuDB存储引擎是Tokutek2009年发布的一个数据库存储引擎,于2013年4月开源。支持MySQL/MariaDB。它和InnoDB一样支持事务、MVCC。

特色:
Franctal Tree而不是B-Tree
内部结点不仅有指向父子的指针还有Buffer区,数据写入先写buffer区,FIFO结构,写入只需要顺序添加到Buffer区就可返回,后续满时一次性刷新到下面的子树中,插入数据基本上是一个顺序添加的过程。可轻松应对随机IO,减少空间碎片。
出色的压缩性能
块大小默认是4MB
在线DDL

数据结构:
一、Buffered Tree:类B-Tree,写时直接写到Root结点,如果Root结点满了,就把数据刷新到它子结点上,如果子结点满了就继续刷新到子子结点,一直这样下去。因此,只有当结点满时才有Disk Seek产生。Node大小可设置很大,比如4MB,为提高读,需要对Node做更细划分,分成小块,随机读IO复杂度也为O(logN)。
写:
写时不产生disk seek,因为总是先写Root,由于经常被操作,可认为树顶部结点一直在Page Cache中。
节点数据紧凑,大Node压缩有优势
天然适合做事务,没有undo log,叶子结点上做mvcc

二、Fractal-Tree(Buffer-Tree的变种)
buffered(4,16)-tree,OMT结构维护结点数据,大小4MB,nonleaf节点OMT结构,leaf节点多个OMT(4MB/64KB)

OMT:Order Maintenance Tree
元素用数组表示,具有父子关系的元素尽量相邻存储,cpu cache line(如果一个节点的周边节点能在文件中紧邻的被存储,当读取其中一个的时候,其他节点被prefetching出来,io数则可以减少。这就是vEB layout)

结点刷新:Node完成写时检查是否满足flush条件,满足加到flush队列,后台线程并行处理从队列中读取的任务。

CheckPoint:60秒一次,sharp checkpoint无Fuzz Checkpoint,会对所有索引加只读锁,其它线程写node时,会clone一个Node。做完后检查LSN,以清理无用的log。基本上这个操作不影响前台读写操作,但是因为会进行数据压缩和clone node、写磁盘的操作,会造成一定的性能波动。

Cache:LRU,写cache时添加node到cache链表,然后检查cache状态,设置了四个水平位(低水平位,低警示点,高警示点,高水平位),高于高水平位,客户端线程进入等待状态,超过低警示点,开始收集逐出数据。

三、读:
KEY读:每次读数据都要从ROOT到LEAF,做数据合并,才能得到完整的ROW数据。
范围读:对树做深度优先遍历,在LEAF结点返回,痛苦。

四、Schema Changes
列修改:Broadcast类型的Message,从Root广播到每一节点,最后到达LEAF结点。(Mysql对列修改会先关闭表,再打开表,关闭时Tokudb会把脏Node写盘,有些性能消耗),腾讯的游戏运维部门在InnoDB上实现了类似的功能。

索引:两种索引,cst_(clustering index)和cvr_(covering index),每个都是一个单独的F-tree文件
offline方式:启动多个线程,遍历记录生成索引,速度快,但创建过程中写操作不可用。
hot方式:速度慢,创建过程读写不受影响

log:log manager来管理log文件,无重做日志组的概念,当日志写满后重新生成一个文件继续写,checkpoint后检查数据都被刷新到磁盘后会删除,达到InnoDB类似的效果;分in buffer和out buffer(都是16MB),在两块内存上来回倒,多线程下锁控制方便。

优点
高压缩比,写性能高,DDL速度超快
缺点
cpu usr态消耗,响应时间变长
场景
插入效率高、压缩效率好、可在线DDL,适合写性能要求高,数据量过亿,记录不是太大的场合。

大压缩比所以CPU的USR态有消耗
相同QPS时读IOPS差别不大,写IOPS InnoDB平均多消耗些
TokuDB数据经过压缩,但会有解压消耗,所以结果不好确定,测试上看响应时间高于InnoDB

IOPS (Input/Output Operations Per Second),即每秒进行读写(I/O)操作的次数。
QPS:Queries Per Second意思是“每秒查询率”,是一台服务器每秒能够相应的查询次数,是对一个特定的查询服务器在规定时间内所处理流量多少的衡量标准。