ElaticSearch

elasticsearch安装

部署3节点集群,配置8核 8G

配置Hosts文件

1
2
3
4
# /etc/hosts
192.168.1.1  node-1 
192.168.1.2  node-2
192.168.1.3  node-3

安装

1
2
3
4
5
6
7
# elasticsearch-7.2.0-linux-x86_64.tar.gz
# 解压:
tar zxvf elasticsearch-7.2.0-linux-x86_64.tar.gz
ln -s /data1/elasticsearch-7.2.0 /data1/elasticsearch
# 创建日志与存储节点
mkdir -p /data1/data/elasticsearch
mkdir -p /data1/logs/elasticsearch

修改配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# cp /data1/elasticsearch/config/elasticsearch.yml /data1/elasticsearch/config/elasticsearch.yml.bak
# cp /data1/elasticsearch/config/jvm.options /data1/elasticsearch/config/jvm.options.bak
 
# 配置文件:/data1/elasticsearch/config/elasticsearch.yml 
cluster.name: tech-log-es-online
node.name: node-1
node.attr.rack: r1
node.attr.box_type: hot
path.data: /data1/data/elasticsearch
path.logs: /data1/logs/elasticsearch
bootstrap.memory_lock: true
network.host: 192.168.1.1
http.port: 9200
discovery.seed_hosts: ["node-1", "node-2", "node-3"]
cluster.initial_master_nodes: ["node-1", "node-2", "node-3"]
gateway.recover_after_nodes: 3
action.destructive_requires_name: true
 
 
cluster.name: tech-log-es-online
node.name: node-2
node.attr.rack: r1
node.attr.box_type: hot
path.data: /data1/data/elasticsearch
path.logs: /data1/logs/elasticsearch
bootstrap.memory_lock: true
network.host: 192.168.1.2
http.port: 9200
discovery.seed_hosts: ["node-1", "node-2", "node-3"]
cluster.initial_master_nodes: ["node-1", "node-2", "node-3"]
gateway.recover_after_nodes: 3
action.destructive_requires_name: true
 
 
cluster.name: tech-log-es-online
node.name: node-3
node.attr.rack: r1
node.attr.box_type: hot
path.data: /data1/data/elasticsearch
path.logs: /data1/logs/elasticsearch
bootstrap.memory_lock: true
network.host: 192.168.1.3
http.port: 9200
discovery.seed_hosts: ["node-1", "node-2", "node-3"]
cluster.initial_master_nodes: ["node-1", "node-2", "node-3"]
gateway.recover_after_nodes: 3
action.destructive_requires_name: true
 
# 配置文件 /data1/elasticsearch/config/jvm.options 只修改JVM内存配置
-Xms4g
-Xmx4g

修改系统配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 配置1:
修改:/etc/sysctl.conf 
vm.max_map_count=262144
执行: sysctl -w vm.max_map_count=262144
 
否则报:
ERROR: [2] bootstrap checks failed
[1]: memory locking requested for elasticsearch process but memory is not locked
[2]: max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
 
# 配置2:
创建文件 /etc/systemd/system/elasticsearch.service.d/override.conf 添加内容
[Service]
LimitMEMLOCK=infinity
 
# 配置3:
修改 /etc/security/limits.conf 添加
# allow user 'work' mlockall
work soft memlock unlimited
work hard memlock unlimited

启动服务

1
./bin/elasticsearch -d

配置文件详解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 定义集群名称
cluster.name: tech-log-es-online
# 当前节点名字
node.name: node-3
node.attr.rack: r1
# node所属box类型,hot、warm
node.attr.box_type: hot
# 日志和数据文件目录
path.data: /data1/data/elasticsearch
path.logs: /data1/logs/elasticsearch
# 是否锁住内存,避免交换(swapped)带来的性能损失
bootstrap.memory_lock: true
# 节点绑定的主机名或ip地址,并且会将此节点通知集群中的其它节点
network.host: 192.168.1.3
http.port: 9200
discovery.seed_hosts: ["node-1", "node-2", "node-3"]
# 设置候选主节点的主机名或 IP 地址
cluster.initial_master_nodes: ["node-1", "node-2", "node-3"]
# 设置集群中的N个节点启动后,才允许进行数据恢复处理
gateway.recover_after_nodes: 3
# 设置是否可以通过正则或者_all删除或者关闭索引库,默认true表示必须需要显式指定索引库名称
action.destructive_requires_name: true
 
 
# 节点类型配置
# 默认情况下,node.mater和node.data的值都为true,即为候选主节点也为数据节点
 
# node.master: true # 此节点是否为候选主节点,主节点负责创建索引,删除索引,分配分片,追踪集群中其他节点的状态等工作,随着集群的扩大,建议设置专用的候选主节点,避免因数据节点负载重导致主节点不响应
 
# node.data: true # 此节点是否为数据节点,数据节点承载了数据的操作,索引和搜索数据会占用大量的CPU,内存,IO资源,负载通常都很高
 
# 如果master和data都为false,则为客户端节点,客户端节点只负责处理用户请求,实现请求转发,负载均衡等功能

参考资料:
Elasticsearch节点类型以及各种节点的分工

未分类

DataNode

基于社区2.7.2分支源码分析

一、分析结果

可优化点及对应社区Path

  1. DataNode启动时,初始化FsDatasetImpl锁粒度调整(HDFS-10682)
    • https://issues.apache.org/jira/browse/HDFS-10682
    • Replace FsDatasetImpl object lock with a separate lock object

      This Jira proposes to replace the FsDatasetImpl object lock with a separate lock object. Doing so will make it easier to measure lock statistics like lock held time and warn about potential lock contention due to slow disk operations.
      Right now we can use org.apache.hadoop.util.AutoCloseableLock. In the future we can also consider replacing the lock with a read-write lock.

  2. DataNode启动时,checkDiskError()检查并踢除坏掉的路径的地方可以并行检查并且不需要锁(HDFS-11086 )
    • https://issues.apache.org/jira/browse/HDFS-11086
    • DataNode disk check improvements

      This Jira tracks a few improvements to DataNode’s usage of DiskChecker to address the following problems:

      • Checks are serialized so a single slow disk can indefinitely delay checking the rest.
      • Related to 1, no detection of stalled checks.
      • Lack of granularity. A single IO error initiates checking all disks.
      • Inconsistent activation. Some DataNode IO failures trigger disk checks but not all.
  3. DataNode可优化内存占用来提高性能(HDFS-9260 HDFS-8859)
  4. 移除多余的Replica复制写操作
  5. DataNode ReplicaMap 锁使用(HDFS-10828 )
    • https://issues.apache.org/jira/browse/HDFS-10828
    • Fix usage of FsDatasetImpl object lock in ReplicaMap

      HDFS-10682 replaced the FsDatasetImpl object lock with a separate reentrant lock but missed updating an instance ReplicaMap still uses the FsDatasetImpl.

  6. DataNode启动时getAllVolumesMap可以用文件缓存初始化,需要在DataNode关闭时把VolumesMap序列化到文件(HDFS-7928 )
    • https://issues.apache.org/jira/browse/HDFS-7928
    • Scanning blocks from disk during rolling upgrade startup takes a lot of time if disks are busy

      We observed this issue in rolling upgrade to 2.6.x on one of our cluster.
      One of the disks was very busy and it took long time to scan that disk compared to other disks.
      Seeing the sar (System Activity Reporter) data we saw that the particular disk was very busy performing IO operations.
      Requesting for an improvement during datanode rolling upgrade.
      During shutdown, we can persist the whole volume map on the disk and let the datanode read that file and create the volume map during startup after rolling upgrade.
      This will not require the datanode process to scan all the disk and read the block.
      This will significantly improve the datanode startup time.

  7. 通过NameNode的heartbeat消息进行DataNodes的Full BLock Reports流控( HDFS-7923 )
    • https://issues.apache.org/jira/browse/HDFS-7923
    • The DataNodes should rate-limit their full block reports by asking the NN on heartbeat messages (cmccabe)

      The DataNodes should rate-limit their full block reports. They can do this by first sending a heartbeat message to the NN with an optional boolean set which requests permission to send a full block report. If the NN responds with another optional boolean set, the DN will send an FBR… if not, it will wait until later. This can be done compatibly with optional fields.

实现

HDFS-7923

DataNode在实现了sendHeartBeat和brockReport(BPServiceActor.offerService),DataNode周期执行offerService中的循环任务,当检测到(fullBlockReportLeaseId = 0)并且到下次report的时间(nextBlockReportTime – curTime <= 0)【注:第一心跳时fullBlockReportLeaseId=0,之后每发送完report后都会重置它为0,nextBlockReportTime默认值为当前时间,每次执行完report后会设置它的值为下次执行report的时间点】就会把获取brock report lease的参数添加到心跳请求(HeartbeatRequestProto.requestFullBlockReportLease)中,之后NN(NameNodeRpcServer、BlockManager、BlockReportLeaseManager)会检查是否可以给DN分配lease,如果可以就会在心跳返回消息中带上leaseId(HeartbeatResponseProto.fullBlockReportLeaseId),DN获得lease后,就可以向NN发送block report了。

NN的 BlockReportLeaseManager 中维护了两个双向链表(deferredHead、pendingHead),pendingHead代表当前拥有report lease的datanode节点。通过numPending与maxPending来控制datanode block report lease的发放,当datanode获得lease时,会从deferedHead移动到pendingHead链表,失去lease或执行完report会在相反的方向移动。lease的有效期为 dfs.namenode.full.block.report.lease.length.ms 默认 5L * 60L * 1000L,最多可以有 dfs.namenode.max.full.block.report.leases 个 datanode 同时拥有lease,默认 6 个。

NameNode兼容未使用流控的DataNode,DN使用旧策略向NN发送心跳,由于旧心跳不会设置requestFullBlockReportLease字段,NN获得到的requestFullBlockReportLease字段值为默认值false,当为false时,不会向BlockManager申请report的leaseId。
DN根据下次执行的时间和当前时间比较决定(下次执行时间第一次设置为Scheduler对象初始化时间,因此可以保证第一次心跳动作后,会执行一次BR)是否发送BR,参数BlockReportContextProto.leaseId默认值为0,当leaseId为0时,NN不检查lease。

升级步骤:
先重组NN,然后再滚动升级DN

执行的流程图如下:
hdfs-7923

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 新增加两个配置
<property>
  <name>dfs.namenode.max.full.block.report.leases</name>
  <value>6</value>
  <description>The maximum number of leases for full block reports that the
    NameNode will issue at any given time.  This prevents the NameNode from
    being flooded with full block reports that use up all the RPC handler
    threads.  This number should never be more than the number of RPC handler
    threads or less than 1.
  </description>
</property>
 
<property>
  <name>dfs.namenode.full.block.report.lease.length.ms</name>
  <value>300000</value>
  <description>
    The number of milliseconds that the NameNode will wait before invalidating
    a full block report lease.  This prevents a crashed DataNode from
    permanently using up a full block report lease.
  </description>
</property>

二、代码走查

启动工作线程

1. 通信:BPServiceActor,IpcServer,DataXceiverServer,localDataXceiverServer
2. 监控:JVMPauseMonitor,DU(dfsUsage)
3. 其他:InfoServer

入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# org.apache.hadoop.hdfs.server.datanode.DataNode
public static void main(String args[]) {
	if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {
	  System.exit(0);
	}
 
	secureMain(args, null);
}
 
public static void secureMain(String args[], SecureResources resources) {
	int errorCode = 0;
	try {
	  StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
	  // 创建DataNode
	  DataNode datanode = createDataNode(args, null, resources);
	  if (datanode != null) {
	    // 注:只需要等待BPServiceActor线程结束
	    datanode.join();
	  } else {
	    errorCode = 1;
	  }
	} catch (Throwable e) {
	  LOG.fatal("Exception in secureMain", e);
	  terminate(1, e);
	} finally {
	  // We need to terminate the process here because either shutdown was called
	  // or some disk related conditions like volumes tolerated or volumes required
	  // condition was not met. Also, In secure mode, control will go to Jsvc
	  // and Datanode process hangs if it does not exit.
	  LOG.warn("Exiting Datanode");
	  terminate(errorCode);
	}
}
 
  public static DataNode createDataNode(String args[], Configuration conf,
      SecureResources resources) throws IOException {
    // 初始化DataNode,并完成部分工作线程的启动
    DataNode dn = instantiateDataNode(args, conf, resources);
    if (dn != null) {
      // 启动剩余工作线程,dataXceiverServer,localDataXceiverServer,ipcServer等
      dn.runDatanodeDaemon();
    }
    return dn;
  }
 
   public static DataNode instantiateDataNode(String args [], Configuration conf,
      SecureResources resources) throws IOException {
    if (conf == null)
      conf = new HdfsConfiguration();
 
    if (args != null) {
      // parse generic hadoop options
      GenericOptionsParser hParser = new GenericOptionsParser(conf, args);
      args = hParser.getRemainingArgs();
    }
 
    if (!parseArguments(args, conf)) {
      printUsage(System.err);
      return null;
    }
    // dfs.datanode.data.dir 可以[,]号分隔,如:[Disk]/storages/storage1/
    Collection<StorageLocation> dataLocations = getStorageLocations(conf);
    UserGroupInformation.setConfiguration(conf);
    SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
        DFS_DATANODE_KERBEROS_PRINCIPAL_KEY);
    return makeInstance(dataLocations, conf, resources);
  }
 
  /**
   * 确认所给的数据目录至少有一个可以创建(如果父目录不存在也要可以创建)然后实例化DataNode
   */
  static DataNode makeInstance(Collection<StorageLocation> dataDirs,
      Configuration conf, SecureResources resources) throws IOException {
    LocalFileSystem localFS = FileSystem.getLocal(conf);
    FsPermission permission = new FsPermission(
        conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
                 DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
    // 磁盘检查类,需要时创建本地目录,检查权限并且保证可以对目录进行读写
    DataNodeDiskChecker dataNodeDiskChecker =
        new DataNodeDiskChecker(permission);
    // 过滤出可以有rwx的目录,TODO: 注意不能rwx的目录需要检查
    List<StorageLocation> locations =
        checkStorageLocations(dataDirs, localFS, dataNodeDiskChecker);
    DefaultMetricsSystem.initialize("DataNode");
 
    assert locations.size() > 0 : "number of data directories should be > 0";
    return new DataNode(conf, locations, resources);
  }
 
  /**
   * Create the DataNode given a configuration, an array of dataDirs,
   * and a namenode proxy
   */
  DataNode(final Configuration conf,
           final List<StorageLocation> dataDirs,
           final SecureResources resources) throws IOException {
    ......
    try {
      hostName = getHostName(conf);
      LOG.info("Configured hostname is " + hostName);
      startDataNode(conf, dataDirs, resources);
    } catch (IOException ie) {
      shutdown();
      throw ie;
    }
    ......
  }
 
  void startDataNode(Configuration conf,
                     List<StorageLocation> dataDirs,
                     SecureResources resources
                     ) throws IOException {
 
    // settings global for all BPs in the Data Node
    this.secureResources = resources;
    synchronized (this) {
      this.dataDirs = dataDirs;
    }
    this.conf = conf;
    this.dnConf = new DNConf(conf);
    checkSecureConfig(dnConf, conf, resources);
 
    this.spanReceiverHost =
      SpanReceiverHost.get(conf, DFSConfigKeys.DFS_SERVER_HTRACE_PREFIX);
 
    if (dnConf.maxLockedMemory > 0) {
      if (!NativeIO.POSIX.getCacheManipulator().verifyCanMlock()) {
        throw new RuntimeException(String.format(
            "Cannot start datanode because the configured max locked memory" +
            " size (%s) is greater than zero and native code is not available.",
            DFS_DATANODE_MAX_LOCKED_MEMORY_KEY));
      }
      if (Path.WINDOWS) {
        NativeIO.Windows.extendWorkingSetSize(dnConf.maxLockedMemory);
      } else {
        long ulimit = NativeIO.POSIX.getCacheManipulator().getMemlockLimit();
        if (dnConf.maxLockedMemory > ulimit) {
          throw new RuntimeException(String.format(
            "Cannot start datanode because the configured max locked memory" +
            " size (%s) of %d bytes is more than the datanode's available" +
            " RLIMIT_MEMLOCK ulimit of %d bytes.",
            DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
            dnConf.maxLockedMemory,
            ulimit));
        }
      }
    }
    LOG.info("Starting DataNode with maxLockedMemory = " +
        dnConf.maxLockedMemory);
 
    // 初始化DataStorage
    storage = new DataStorage();
 
    // global DN settings
    // 注册JMX
    registerMXBean();
    // https://blog.csdn.net/lipeng_bigdata/article/details/50828066
    // 初始化:DataXceiverServer,这个服务是用来接收、发送数据块,它从客户端或其它DataNode接收请求,流式通信,在DataNode#runDatanodeDaemon()中启动
    initDataXceiver(conf);
    // 启动InfoServer(WEB UI)
    startInfoServer(conf);
    // 启动JvmPauseMonitor,反射监控JVM,可以通过JMS查询
    pauseMonitor = new JvmPauseMonitor(conf);
    pauseMonitor.start();
 
    // BlockPoolTokenSecretManager is required to create ipc server.
    this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();
 
    // Login is done by now. Set the DN user name.
    dnUserName = UserGroupInformation.getCurrentUser().getShortUserName();
    LOG.info("dnUserName = " + dnUserName);
    LOG.info("supergroup = " + supergroup);
    // 初始化IPC服务,在DataNode#runDatanodeDaemon()中启动
    initIpcServer(conf);
 
    metrics = DataNodeMetrics.create(conf, getDisplayName());
    metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
 
    // 初始化BlockPoolManager
    blockPoolManager = new BlockPoolManager(this);
    blockPoolManager.refreshNamenodes(conf);
 
    // Create the ReadaheadPool from the DataNode context so we can
    // exit without having to explicitly shutdown its thread pool.
    readaheadPool = ReadaheadPool.getInstance();
    saslClient = new SaslDataTransferClient(dnConf.conf,
        dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
    saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
  }

BlockPoolManager & BPOfferService

1. BlockPoolManager: 管理DataNode上所有的块池,对象的创建、删除、启动、停止、关闭必须通过这个类的API完成。每一个DataNode都有一个BlockManager的实例。
2. BPOfferService: 在DataNode上每个块池/名称空间一个实例,它处理名称空间的活动和备用NN的心跳。 此类为每个NN管理一个 BPServiceActor 实例,并将调用委托给这两个NN。 它还维护关于哪些NN被视为活动的状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
private void doRefreshNamenodes(
      Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException {
    assert Thread.holdsLock(refreshNamenodesLock);
 
    Set<String> toRefresh = Sets.newLinkedHashSet();
    Set<String> toAdd = Sets.newLinkedHashSet();
    Set<String> toRemove;
 
    synchronized (this) {
      // Step 1. For each of the new nameservices, figure out whether
      // it's an update of the set of NNs for an existing NS,
      // or an entirely new nameservice.
      // 启动时bpByNaeserviceId是空集合
      for (String nameserviceId : addrMap.keySet()) {
        if (bpByNameserviceId.containsKey(nameserviceId)) {
          toRefresh.add(nameserviceId);
        } else {
          toAdd.add(nameserviceId);
        }
      }
 
      // Step 2. Any nameservices we currently have but are no longer present
      // need to be removed.
      toRemove = Sets.newHashSet(Sets.difference(
          bpByNameserviceId.keySet(), addrMap.keySet()));
 
      assert toRefresh.size() + toAdd.size() ==
        addrMap.size() :
          "toAdd: " + Joiner.on(",").useForNull("<default>").join(toAdd) +
          "  toRemove: " + Joiner.on(",").useForNull("<default>").join(toRemove) +
          "  toRefresh: " + Joiner.on(",").useForNull("<default>").join(toRefresh);
 
 
      // Step 3. Start new nameservices
      if (!toAdd.isEmpty()) {
        LOG.info("Starting BPOfferServices for nameservices: " +
            Joiner.on(",").useForNull("<default>").join(toAdd));
 
        for (String nsToAdd : toAdd) {
          ArrayList<InetSocketAddress> addrs =
            Lists.newArrayList(addrMap.get(nsToAdd).values());
          // 创建BPOfferService
          BPOfferService bpos = createBPOS(addrs);
          // 加入bpByNameserviceId集合,下次就会出现在toRefresh集合
          bpByNameserviceId.put(nsToAdd, bpos);
          offerServices.add(bpos);
        }
      }
      // 启动所有的BPOfferService,实际是启动下面的BPServiceActor
      startAll();
    }
 
    // Step 4. Shut down old nameservices. This happens outside
    // of the synchronized(this) lock since they need to call
    // back to .remove() from another thread
    if (!toRemove.isEmpty()) {
      LOG.info("Stopping BPOfferServices for nameservices: " +
          Joiner.on(",").useForNull("<default>").join(toRemove));
 
      for (String nsToRemove : toRemove) {
        BPOfferService bpos = bpByNameserviceId.get(nsToRemove);
        bpos.stop();
        bpos.join();
        // they will call remove on their own
      }
    }
 
    // Step 5. Update nameservices whose NN list has changed
    if (!toRefresh.isEmpty()) {
      LOG.info("Refreshing list of NNs for nameservices: " +
          Joiner.on(",").useForNull("<default>").join(toRefresh));
 
      for (String nsToRefresh : toRefresh) {
        BPOfferService bpos = bpByNameserviceId.get(nsToRefresh);
        ArrayList<InetSocketAddress> addrs =
          Lists.newArrayList(addrMap.get(nsToRefresh).values());
        bpos.refreshNNList(addrs);
      }
    }
  }
 
  BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
    Preconditions.checkArgument(!nnAddrs.isEmpty(),
        "Must pass at least one NN.");
    this.dn = dn;
 
    for (InetSocketAddress addr : nnAddrs) {
      this.bpServices.add(new BPServiceActor(addr, this));
    }
  }

BPServiceActor

startAll(); # 启动所有的BPOfferService,BPOfferService又会启动BPServiceActor的bpThread线程,bpThread线程的run执行逻辑是:

1. 与NameNode进行第一次握手,获取命名空间的信息
2. 向NameNode注册当前DataNode
3. 周期性发送心跳到namenode,增量块汇报,全量块汇报,缓存块汇报等
4. 处理来自namenode的命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
  # BPServiceActor.run()
  public void run() {
    LOG.info(this + " starting to offer service");
 
    try {
      while (true) {
        // init stuff
        try {
          // setup storage
          // 和namenode注册
          // TODO: 优化
          connectToNNAndHandshake();
          break;
        } catch (IOException ioe) {
          // Initial handshake, storage recovery or registration failed
          runningState = RunningState.INIT_FAILED;
          if (shouldRetryInit()) {
            // Retry until all namenode's of BPOS failed initialization
            LOG.error("Initialization failed for " + this + " "
                + ioe.getLocalizedMessage());
            sleepAndLogInterrupts(5000, "initializing");
          } else {
            runningState = RunningState.FAILED;
            LOG.fatal("Initialization failed for " + this + ". Exiting. ", ioe);
            return;
          }
        }
      }
 
      runningState = RunningState.RUNNING;
 
      while (shouldRun()) {
        try {
          offerService();
        } catch (Exception ex) {
          LOG.error("Exception in BPOfferService for " + this, ex);
          sleepAndLogInterrupts(5000, "offering service");
        }
      }
      runningState = RunningState.EXITED;
    } catch (Throwable ex) {
      LOG.warn("Unexpected exception in block pool " + this, ex);
      runningState = RunningState.FAILED;
    } finally {
      LOG.warn("Ending block pool service for: " + this);
      cleanUp();
    }
  }
 
  private void connectToNNAndHandshake() throws IOException {
    // get NN proxy
    bpNamenode = dn.connectToNN(nnAddr);
 
    // First phase of the handshake with NN - get the namespace
    // info.
    NamespaceInfo nsInfo = retrieveNamespaceInfo();
 
    // Verify that this matches the other NN in this HA pair.
    // This also initializes our block pool in the DN if we are
    // the first NN connection for this BP.
    bpos.verifyAndSetNamespaceInfo(nsInfo);
 
    // Second phase of the handshake with the NN.
    register(nsInfo);
  }
 
  # BPOfferService.verifyAndSetNamespaceInfo(NamespaceInfo nsInfo)
  /**
   * Called by the BPServiceActors when they handshake to a NN.
   * If this is the first NN connection, this sets the namespace info
   * for this BPOfferService. If it's a connection to a new NN, it
   * verifies that this namespace matches (eg to prevent a misconfiguration
   * where a StandbyNode from a different cluster is specified)
   */
  void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
    writeLock();
    try {
      if (this.bpNSInfo == null) {
        this.bpNSInfo = nsInfo;
        boolean success = false;
 
        // Now that we know the namespace ID, etc, we can pass this to the DN.
        // The DN can now initialize its local storage if we are the
        // first BP to handshake, etc.
        try {
          dn.initBlockPool(this);
          success = true;
        } finally {
          if (!success) {
            // The datanode failed to initialize the BP. We need to reset
            // the namespace info so that other BPService actors still have
            // a chance to set it, and re-initialize the datanode.
            this.bpNSInfo = null;
          }
        }
      } else {
        checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(),
            "Blockpool ID");
        checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(),
            "Namespace ID");
        checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(),
            "Cluster ID");
      }
    } finally {
      writeUnlock();
    }
  }

dn.initBlockPool(this);# DataNode.initBlockPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# DataNode.initBlockPool(BPOfferService bpos)
/**
   * One of the Block Pools has successfully connected to its NN.
   * This initializes the local storage for that block pool,
   * checks consistency of the NN's cluster ID, etc.
   *
   * If this is the first block pool to register, this also initializes
   * the datanode-scoped storage.
   *
   * @param bpos Block pool offer service
   * @throws IOException if the NN is inconsistent with the local storage.
   */
  void initBlockPool(BPOfferService bpos) throws IOException {
    NamespaceInfo nsInfo = bpos.getNamespaceInfo();
    if (nsInfo == null) {
      throw new IOException("NamespaceInfo not found: Block pool " + bpos
          + " should have retrieved namespace info before initBlockPool.");
    }
 
    setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID());
 
    // Register the new block pool with the BP manager.
    blockPoolManager.addBlockPool(bpos);
 
    // In the case that this is the first block pool to connect, initialize
    // the dataset, block scanners, etc.
    initStorage(nsInfo);
 
    // Exclude failed disks before initializing the block pools to avoid startup
    // failures.
    checkDiskError();
 
    data.addBlockPool(nsInfo.getBlockPoolID(), conf);
    blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
    initDirectoryScanner(conf);
  }

initStorage(nsInfo);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
  # DataNode.initStorage(final NamespaceInfo nsInfo)
  // 8c0638471f8f1dd47667b2d6727d4d2d54e4b48c Tue Aug 09 03:02:53 CST 2016    Arpit Agarwal   HADOOP-10682. Replace FsDatasetImpl object lock with a separate lock object. (Chen Liang)
  // TODO: 查看initStorage(nsInfo)方法的实现
  /**
   * Initializes the {@link #data}. The initialization is done only once, when
   * handshake with the the first namenode is completed.
   */
  private void initStorage(final NamespaceInfo nsInfo) throws IOException {
    final FsDatasetSpi.Factory<? extends FsDatasetSpi<?>> factory
        = FsDatasetSpi.Factory.getFactory(conf);
 
    if (!factory.isSimulated()) {
      final StartupOption startOpt = getStartupOption(conf);
      if (startOpt == null) {
        throw new IOException("Startup option not set.");
      }
      final String bpid = nsInfo.getBlockPoolID();
      //read storage info, lock data dirs and transition fs state if necessary
      synchronized (this) {
        storage.recoverTransitionRead(this, nsInfo, dataDirs, startOpt);
      }
      final StorageInfo bpStorage = storage.getBPStorage(bpid);
      LOG.info("Setting up storage: nsid=" + bpStorage.getNamespaceID()
          + ";bpid=" + bpid + ";lv=" + storage.getLayoutVersion()
          + ";nsInfo=" + nsInfo + ";dnuuid=" + storage.getDatanodeUuid());
    }
 
    // If this is a newly formatted DataNode then assign a new DatanodeUuid.
    checkDatanodeUuid();
 
    synchronized(this)  {
      if (data == null) {
        data = factory.newInstance(this, storage, conf);
      }
    }
  }
 
  // factory使用的初始化语句为
  /**
 * A factory for creating {@link FsDatasetImpl} objects.
 */
public class FsDatasetFactory extends FsDatasetSpi.Factory<FsDatasetImpl> {
  @Override
  public FsDatasetImpl newInstance(DataNode datanode,
      DataStorage storage, Configuration conf) throws IOException {
    return new FsDatasetImpl(datanode, storage, conf);
  }
}
 
// 查看FsDatasetImpl的构造函数
/**
   * An FSDataset has a directory where it loads its data files.
   */
  FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf
      ) throws IOException {
    this.fsRunning = true;
    this.datanode = datanode;
    this.dataStorage = storage;
    this.conf = conf;
    // The number of volumes required for operation is the total number
    // of volumes minus the number of failed volumes we can tolerate.
    final int volFailuresTolerated =
      conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
                  DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
 
    String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
    Collection<StorageLocation> dataLocations = DataNode.getStorageLocations(conf);
    List<VolumeFailureInfo> volumeFailureInfos = getInitialVolumeFailureInfos(
        dataLocations, storage);
 
    int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length;
    int volsFailed = volumeFailureInfos.size();
    this.validVolsRequired = volsConfigured - volFailuresTolerated;
 
    if (volFailuresTolerated < 0 || volFailuresTolerated >= volsConfigured) {
      throw new DiskErrorException("Invalid volume failure "
          + " config value: " + volFailuresTolerated);
    }
    if (volsFailed > volFailuresTolerated) {
      throw new DiskErrorException("Too many failed volumes - "
          + "current valid volumes: " + storage.getNumStorageDirs()
          + ", volumes configured: " + volsConfigured
          + ", volumes failed: " + volsFailed
          + ", volume failures tolerated: " + volFailuresTolerated);
    }
 
    storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
    volumeMap = new ReplicaMap(this);
    ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
 
    @SuppressWarnings("unchecked")
    final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
        ReflectionUtils.newInstance(conf.getClass(
            DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
            RoundRobinVolumeChoosingPolicy.class,
            VolumeChoosingPolicy.class), conf);
    // volumes对象生成
    volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(),
        blockChooserImpl);
    asyncDiskService = new FsDatasetAsyncDiskService(datanode, this);
    asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode);
    deletingBlock = new HashMap<String, Set<Long>>();
    // 添加volume到volumes对象
    for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
      addVolume(dataLocations, storage.getStorageDir(idx));
    }
    setupAsyncLazyPersistThreads();
 
    cacheManager = new FsDatasetCache(this);
 
    // Start the lazy writer once we have built the replica maps.
    lazyWriter = new Daemon(new LazyWriter(conf));
    lazyWriter.start();
    registerMBean(datanode.getDatanodeUuid());
    localFS = FileSystem.getLocal(conf);
    blockPinningEnabled = conf.getBoolean(
      DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED,
      DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT);
  }
  // addVolume的实现
  private void addVolume(Collection<StorageLocation> dataLocations,
      Storage.StorageDirectory sd) throws IOException {
    final File dir = sd.getCurrentDir();
    final StorageType storageType =
        getStorageTypeFromLocations(dataLocations, sd.getRoot());
 
    // If IOException raises from FsVolumeImpl() or getVolumeMap(), there is
    // nothing needed to be rolled back to make various data structures, e.g.,
    // storageMap and asyncDiskService, consistent.
    FsVolumeImpl fsVolume = new FsVolumeImpl(
        this, sd.getStorageUuid(), dir, this.conf, storageType);
    FsVolumeReference ref = fsVolume.obtainReference();
    ReplicaMap tempVolumeMap = new ReplicaMap(this);
    fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
 
    // TODO: 锁优化,拆分锁,调整锁定范围
    synchronized (this) {
      volumeMap.addAll(tempVolumeMap);
      storageMap.put(sd.getStorageUuid(),
          new DatanodeStorage(sd.getStorageUuid(),
              DatanodeStorage.State.NORMAL,
              storageType));
      asyncDiskService.addVolume(sd.getCurrentDir());
      volumes.addVolume(ref);
    }
 
    LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
  }

checkDiskError();删除坏掉的磁盘

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# DataNode.checkDiskError()
// TODO: 优化点,可并行检查,并且不需要锁定
// f678080dbd25a218e0406463a3c3a1fc03680702	Wed Dec 21 05:53:32 CST 2016	Xiaoyu Yao	HDFS-11182. Update DataNode to use DatasetVolumeChecker. Contributed by Arpit Agarwal.
// eaaa32950cbae42a74e28e3db3f0cdb1ff158119	Wed Nov 30 12:31:02 CST 2016	Arpit Agarwal	HDFS-11149. Support for parallel checking of FsVolumes.
/**
   * Check the disk error
   */
  private void checkDiskError() {
    Set<File> unhealthyDataDirs = data.checkDataDir();
    if (unhealthyDataDirs != null && !unhealthyDataDirs.isEmpty()) {
      try {
        // Remove all unhealthy volumes from DataNode.
        removeVolumes(unhealthyDataDirs, false);
      } catch (IOException e) {
        LOG.warn("Error occurred when removing unhealthy storage dirs: "
            + e.getMessage(), e);
      }
      StringBuilder sb = new StringBuilder("DataNode failed volumes:");
      for (File dataDir : unhealthyDataDirs) {
        sb.append(dataDir.getAbsolutePath() + ";");
      }
      handleDiskError(sb.toString());
    }
  }
 
  # FsDatasetImpl.checkDirs()
  Set<File> checkDirs() {
    synchronized(checkDirsMutex) {
      Set<File> failedVols = null;
 
      // Make a copy of volumes for performing modification
      final List<FsVolumeImpl> volumeList = getVolumes();
 
      for(Iterator<FsVolumeImpl> i = volumeList.iterator(); i.hasNext(); ) {
        final FsVolumeImpl fsv = i.next();
        try (FsVolumeReference ref = fsv.obtainReference()) {
          fsv.checkDirs();
        } catch (DiskErrorException e) {
          FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ", e);
          if (failedVols == null) {
            failedVols = new HashSet<>(1);
          }
          failedVols.add(new File(fsv.getBasePath()).getAbsoluteFile());
          addVolumeFailureInfo(fsv);
          removeVolume(fsv);
        } catch (ClosedChannelException e) {
          FsDatasetImpl.LOG.debug("Caught exception when obtaining " +
            "reference count on closed volume", e);
        } catch (IOException e) {
          FsDatasetImpl.LOG.error("Unexpected IOException", e);
        }
      }
 
      if (failedVols != null && failedVols.size() > 0) {
        FsDatasetImpl.LOG.warn("Completed checkDirs. Found " + failedVols.size()
            + " failure volumes.");
      }
 
      return failedVols;
    }
  }

data.addBlockPool(nsInfo.getBlockPoolID(), conf);

1
2
3
4
5
6
7
8
9
10
@Override
  public void addBlockPool(String bpid, Configuration conf)
      throws IOException {
    LOG.info("Adding block pool " + bpid);
    synchronized(this) {
      volumes.addBlockPool(bpid, conf);
      volumeMap.initBlockPool(bpid);
    }
    volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker);
  }

BlockPoolSlice

关于du的作用及优化:

在linux系统上,该线程将定期通过du -sk命令统计各blockpool目录的占用情况,随着心跳汇报给namenode。

执行linux命令需要从JVM继承fork出子进程,成本较高(尽管linux使用COW策略避免了对内存空间的完全copy)。为了加快datanode启动速度,此处允许使用之前缓存的dfsUsage值,该值保存在current目录下的dfsUsed文件中;缓存的dfsUsage会定期(fs.du.interval,默认600秒)持久化到磁盘中;在虚拟机关闭时,也会将当前的dfsUsage值持久化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
void addBlockPool(final String bpid, final Configuration conf) throws IOException {
    long totalStartTime = Time.monotonicNow();
 
    final List<IOException> exceptions = Collections.synchronizedList(
        new ArrayList<IOException>());
    List<Thread> blockPoolAddingThreads = new ArrayList<Thread>();
    for (final FsVolumeImpl v : volumes.get()) {
      Thread t = new Thread() {
        public void run() {
          try (FsVolumeReference ref = v.obtainReference()) {
            FsDatasetImpl.LOG.info("Scanning block pool " + bpid +
                " on volume " + v + "...");
            long startTime = Time.monotonicNow();
            // 关注
            v.addBlockPool(bpid, conf);
            long timeTaken = Time.monotonicNow() - startTime;
            FsDatasetImpl.LOG.info("Time taken to scan block pool " + bpid +
                " on " + v + ": " + timeTaken + "ms");
          } catch (ClosedChannelException e) {
            // ignore.
          } catch (IOException ioe) {
            FsDatasetImpl.LOG.info("Caught exception while scanning " + v +
                ". Will throw later.", ioe);
            exceptions.add(ioe);
          }
        }
      };
      blockPoolAddingThreads.add(t);
      t.start();
    }
    for (Thread t : blockPoolAddingThreads) {
      try {
        t.join();
      } catch (InterruptedException ie) {
        throw new IOException(ie);
      }
    }
    if (!exceptions.isEmpty()) {
      throw exceptions.get(0);
    }
 
    long totalTimeTaken = Time.monotonicNow() - totalStartTime;
    FsDatasetImpl.LOG.info("Total time to scan all replicas for block pool " +
        bpid + ": " + totalTimeTaken + "ms");
  }
 
  void addBlockPool(String bpid, Configuration conf) throws IOException {
    File bpdir = new File(currentDir, bpid);
    BlockPoolSlice bp = new BlockPoolSlice(bpid, this, bpdir, conf);
    bpSlices.put(bpid, bp);
  }

volumeMap.initBlockPool(bpid);

优化性能和修复锁使用的Bug

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 8ae4729107d33c6001cf1fdc8837afb71ea6c0d3	Wed Sep 28 01:02:15 CST 2016	Arpit Agarwal	HDFS-10828. Fix usage of FsDatasetImpl object lock in ReplicaMap. (Arpit Agarwal) HDFS-10682 replaced the FsDatasetImpl object lock with a separate reentrant lock but missed updating an instance ReplicaMap still uses the FsDatasetImpl.
// dd9ebf6eedfd4ff8b3486eae2a446de6b0c7fa8a	Wed Feb 03 03:23:00 CST 2016	Colin Patrick Mccabe	HDFS-9260. Improve the performance and GC friendliness of NameNode startup and full block reports (Staffan Friberg via cmccabe)
// d6fa34e014b0e2a61b24f05dd08ebe12354267fd	Tue Sep 29 16:20:35 CST 2015	yliu	HDFS-8859. Improve DataNode ReplicaMap memory footprint to save about 45%. (yliu)
 
# ReplicaMap.java
void initBlockPool(String bpid) {
    checkBlockPool(bpid);
    synchronized(mutex) {
      Map<Long, ReplicaInfo> m = map.get(bpid);
      if (m == null) {
        // Add an entry for block pool if it does not exist already
        m = new HashMap<Long, ReplicaInfo>();
        map.put(bpid, m);
      }
    }
  }
  // FsDatasetImpl
  static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map,
      Block block, long recoveryId, long xceiverStopTimeout) throws IOException

volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker);
增加缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// fc1031af749435dc95efea6745b1b2300ce29446	Thu Mar 26 03:42:59 CST 2015	Kihwal Lee	HDFS-7928. Scanning blocks from disk during rolling upgrade startup takes a lot of time if disks are busy. Contributed by Rushabh Shah.
void getVolumeMap(ReplicaMap volumeMap,
                    final RamDiskReplicaTracker lazyWriteReplicaMap)
      throws IOException {
    // Recover lazy persist replicas, they will be added to the volumeMap
    // when we scan the finalized directory.
    if (lazypersistDir.exists()) {
      int numRecovered = moveLazyPersistReplicasToFinalized(lazypersistDir);
      FsDatasetImpl.LOG.info(
          "Recovered " + numRecovered + " replicas from " + lazypersistDir);
    }
 
    // add finalized replicas
    addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true);
    // add rbw replicas
    addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false);
  }
未分类

qcow2镜像定制指南

背景

目前网络上关于定制镜像的说明很分散,需要搜寻很多文章才能完成镜像的定制任务。所以我尝试提供一个全面而系统的指南,遵循本指南,用户可以方便的完成镜像的定制。

实施步骤

一、环境配置

1、准备软件
mac pro、VmWare fusion、CentOS-7-x86_64-DVD-1708.iso、CentOS-7-x86_64-GenericCloud-1708-20180123.qcow2
2、安装嵌套CentOs环境
由于MacOs不支持Kvm,故需要在嵌套的操作系统中安装云镜像需要的软件,使用Fusion很容易在MacOs中虚拟出一个CentOs的环境。
3、修改嵌套操作系统配置
在centos关闭的情况下打开虚拟机“处理器和内存”配置,选择“高级配置”,选中“在此虚拟机中启用虚拟化管理程序”和“在此虚拟机中启用代码分析应用程序”,如无这步操作,则在启动virt-manager时会报:“
virt-manager 报 WARNING : KVM不可用.这可能是因为没有安装KVM软件包,或者没有载入KVM内核模块.您的虚拟机可能性很差。”的错误,启动虚拟机。以下操作如无特殊说明都是在嵌套操作系统中执行。
4、安装依赖

1
2
3
yum install qemu-kvm qemu-img qemu-kvm-tools qemu-kvm-common
yum install libvirt-admin libvirt-client libvirt-daemon libvirt-devel libvirt
yum install libguestfs libguestfs-tools libguestfs-bash-completion

5、编译nbd内核模块(如不使用“nbd挂载方式修改镜像”则不需要安装此模块)
执行命令,出现以下报错时,说明没有nbd模块,需要自己手动安装

1
2
modprobe nbd
modprobe: FATAL: Module nbd not found.

执行下面的命令安装hbd模块

1
2
3
4
5
6
[@xx] # cat /etc/redhat-release
CentOS Linux release 7.4.1708 (Core)
[@xx] # uname -a
Linux localhost 3.10.0-693.el7.x86_64 #1 SMP Tue Aug 22 21:09:27 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
[] # uname -r
3.10.0-693.el7.x86_64

安装kernel组件

1
[@xx] sudo yum install kernel-devel kernel-headers

编译安装hbd组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 下载对应的内核源码包
[@xx] # wget http://vault.centos.org/7.4.1708/os/Source/SPackages/kernel-3.10.0-693.el7.src.rpm
[@xx] # rpm -ihv kernel-3.10.0-693.el7.src.rpm
[@xx] # cd /root/rpmbuild/SOURCES/
[@xx] # tar Jxvf linux-3.10.0-123.el7.tar.xz -C /usr/src/kernels/
[@xx] # cd /usr/src/kernels/
# 配置源码
[@xx] # mv $(uname -r) $(uname -r)-old
[@xx] # mv linux-3.10.0-693.el7 $(uname -r)
# 编译安装
[@xx 3.10.0-693.el7.x86_64] # cd $(uname -r)
[@xx 3.10.0-693.el7.x86_64] # make mrproper
[@xx 3.10.0-693.el7.x86_64] # cp ../$(uname -r)-old/Module.symvers ./
[@xx 3.10.0-693.el7.x86_64] # cp /boot/config-$(uname -r) ./.config
[@xx 3.10.0-693.el7.x86_64] # make oldconfig
[@xx 3.10.0-693.el7.x86_64] # make prepare
[@xx 3.10.0-693.el7.x86_64] # make scripts
[@xx 3.10.0-693.el7.x86_64] # make CONFIG_BLK_DEV_NBD=m M=drivers/block
[@xx 3.10.0-693.el7.x86_64] # cp drivers/block/nbd.ko /lib/modules/$(uname -r)/kernel/drivers/block/
[@xx 3.10.0-693.el7.x86_64] # depmod -a
# 查看nbd模块
[@xx 3.10.0-693.el7.x86_64]$ modinfo nbd
filename:       /lib/modules/3.10.0-693.el7.x86_64/kernel/drivers/block/nbd.ko
license:        GPL
description:    Network Block Device
rhelversion:    7.4
srcversion:     EDE909A294AC5FE08E81957
depends:        
vermagic:       3.10.0 SMP mod_unload modversions 
parm:           nbds_max:number of network block devices to initialize (default: 16) (int)
parm:           max_part:number of partitions per device (default: 0) (int)
parm:           debugflags:flags for controlling debug output (int)

编译安装时的错误处理
阶段:make CONFIG_BLK_DEV_NBD=m M=drivers/block

1
2
3
4
5
6
7
drivers/block/nbd.c: 在函数‘__nbd_ioctl’中:
drivers/block/nbd.c:619:19: 错误:‘REQ_TYPE_SPECIAL’未声明(在此函数内第一次使用)
   sreq.cmd_type = REQ_TYPE_SPECIAL;
                   ^
drivers/block/nbd.c:619:19: 附注:每个未声明的标识符在其出现的函数内只报告一次
make[1]: *** [drivers/block/nbd.o] 错误 1
make: *** [_module_drivers/block] 错误 2

处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
[@xx 3.10.0-693.el7.x86_64] # vim include/linux/blkdev.h
# 由代码可知 REQ_TYPE_SPECIAL = 7
/*
 * request command types
 */
enum rq_cmd_type_bits {
        REQ_TYPE_FS             = 1,    /* fs request */
        REQ_TYPE_BLOCK_PC,              /* scsi command */
        REQ_TYPE_SENSE,                 /* sense request */
        REQ_TYPE_PM_SUSPEND,            /* suspend request */
        REQ_TYPE_PM_RESUME,             /* resume request */
        REQ_TYPE_PM_SHUTDOWN,           /* shutdown request */
#ifdef __GENKSYMS__
        REQ_TYPE_SPECIAL,               /* driver defined type */
#else
        REQ_TYPE_DRV_PRIV,              /* driver defined type */
#endif
        /*
         * for ATA/ATAPI devices. this really doesn"&gt;cmd[0] with the range of driver
         * private REQ_LB opcodes to differentiate what type of request this is
         */
        REQ_TYPE_ATA_TASKFILE,
        REQ_TYPE_ATA_PC,
};
# 修改nbd.c文件
[@xx 3.10.0-693.el7.x86_64] # vim drivers/block/nbd.c
# sreq.cmd_type = REQ_TYPE_SPECIAL;
sreq.cmd_type = 7;
# 重新执行命令
[@xx 3.10.0-693.el7.x86_64] # make CONFIG_BLK_DEV_NBD=m M=drivers/block

二、设置镜像共享

设置嵌套虚拟机文件夹共享
qcow2文件放置在mac本地文件夹中,嵌套虚拟机通过文件共享的方式使用qcow2文件。需要注意的是qcow2文件权限需要在macos中设置为可读写,否则在嵌套虚拟机中无法更新配置。

1
[mac@] # chmod 755 ./CentOS-7-x86_64-GenericCloud-1708-20180123.qcow2

嵌套虚拟机中,需要要关闭SeLinux否则同样无法更新镜像内容

1
sudo /usr/sbin/setenforce 0

三、guestfish工具使用

1、示例程序:获取镜像ip地址

1
2
3
4
5
6
guestfish --rw -a ./CentOS-7-x86_64-GenericCloud-1708-20180123.qcow2
# run
# list-filesystems
/dev/sda1: xfs
# mount /dev/sda1 /
# vi /var/log/cloud-init.log

2、示例程序:配置用户访问权限

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
guestfish --rw -a ./CentOS-7-x86_64-GenericCloud-1708-20180123.qcow2
# run
# list-filesystems
/dev/sda1: xfs
# mount /dev/sda1 /
# mkdir /home/dc2-user/.ssh
# 注意guestfish与shell的区别,权限位是四位
# chown 1001 1001 /home/dc2-user/.ssh
# chmod 0700 /home/dc2-user/.ssh
# touch /home/dc2-user/.ssh/authorized_keys
# chmod 0600 /home/dc2-user/.ssh/authorized_keys
# chown 1001 1001 /home/dc2-user/.ssh/authorized_keys
# ll /home/dc2-user/.ssh/authorized_keys
-rw------- 1 1001 1001 0 Mar  1 10:05 /sysroot/home/dc2-user/.ssh/authorized_keys
# vi /home/dc2-user/.ssh/authorized_keys
# 添加公钥...
# quit

四、nbd挂载方式修改镜像(qemu-nbd)

1、确保已安装nbd模块,加载模块

1
2
# modinfo nbd
# insmod /lib/modules/3.10.0-693.el7.x86_64/kernel/drivers/block/nbd.ko max_part=8

3、建立nbd连接,挂载到目录

1
2
3
# qemu-nbd -c /dev/nbd0 ./CentOS-7-x86_64-GenericCloud-1708-20180123.qcow2
# mkdir -p /mnt/qcows/qcow0
# mount /dev/nbd0p1  /mnt/qcows/qcow0

4、执行chroot

1
# chroot /mnt/qcows/qcow0/

5、执行修改,比如

1
2
$ passwd dc2-user
# 或其它操作

6、修改完毕后解除挂载点,解除连接

1
2
# umount /mnt/qcows/qcow0
# qemu-nbd -d /dev/nbd0p1

五、通过virt-manager挂载虚拟机

1、执行

1
virt-manager

2、新建虚拟机
选择“导入现有磁盘”,“使用ISO镜像”,选择qcow2文件…
如果报:“WARNING : KVM不可用.这可能是因为没有安装KVM软件包,或者没有载入KVM内核模块.您的虚拟机可能性很差。”的警告相应的解决方案是:

  • 关闭虚拟机
  • 进入虚拟机设置(可以配置网卡,硬盘,光驱的地方)
  • 点击“处理器和内存”,勾选虚拟化Inter VT-x/EPT 或AMD-V/RVI(V)

3、登录机器,修改
IP地址和密码、公钥方式登录,在前面已说明如何操作

六、清理痕迹

1、清理/var/log/文件夹
2、删除cloud-init执行记录
cloud-init是专门为云环境的虚拟机初始化而开发的工具,通过读取相应的数据,对虚拟机进行配置。其执行完毕后会在一个叫 sem 的目录下创建信号文件,以便下次启动模块时不再重复执行语句。其目录位置为:
/var/lib/cloud/instances/实例ID/sem

1
2
sudo rm -rf /var/lib/cloud/instances
sudo rm -rf /var/lib/cloud/instance

3、清理history和.ssh目录等

1
2
3
4
5
rm -rf /home/xxx/.ssh
echo '' # /home/xxx/.bash_history
echo '' # /root/.bash_history
 
rm -rf /root/.oracle_jre_usage

七、去除磁盘空洞

1
2
3
4
5
6
# 创建同样大小的镜像
$ qemu-img create -f qcow2 CentOS-7-x86_64-GenericCloud-1708-20180329.qcow2 40G
$ virt-sparsify -x ./CentOS-7-x86_64-GenericCloud-1708-20180123.qcow2 --convert qcow2 ./CentOS-7-x86_64-GenericCloud-1708-20180329.qcow2
$ du -sh *
7.3G	CentOS-7-x86_64-GenericCloud-1708-20180123.qcow2
5.3G	CentOS-7-x86_64-GenericCloud-1708-20180329.qcow2
未分类

Ambari的Stack配置合并

目的

Ambari的stack有继承机制,一个Service可以从stack或者从common-service中继承,当一个服务从另一个版本的服务继承时,有几种规则,对于不同类型的文件,有不同的规则。不幸的是要研究ambari源码或对实现进行定制修复问题,就需要知道某一个版本的的服务完整的配置是怎样的,而这些继承的规则又非常的复杂,通过手工去合并配置和脚本,工作量很大而且无法保证手动合并的代码正确,几乎是不可能手动去完成。需要有一种机制,既能保证合并效率又能保证合并后配置和脚本的正确性。

实现

Ambari代码会处理继承,只需要把Ambari解析到内存中的StackInfo的对象的数据,写入相应的文件和目录中,就可以简单的实现配置和脚本的合并,下面是实现的配置合并的代码,要运行下面的代码,首先需要生成StackInfo的对象info。

1. 获取StackInfo对象

1
2
3
4
5
6
7
StackManager stackManager = new StackManager(
      new File(stackRoot),
      new File(commonServices), // commonServices设置为null时不会从common-service继承实现
      null,
      osFamily, false,
      metaInfoDao, actionMetadata, stackDao, extensionDao, linkDao, helper);
StackInfo info = stackManager.getStack("HDP", "2.6");

2. 主要实现代码

首先使用ambari内部映射对象去encode文件内容写到文件中,如果出现异常(异常应该不难处理,javax.xml.bind.JAXBException: class java.util.LinkedHashMap nor any of its super class is known to this context 此处没有解决此异常,使用其它方法生成文件)使用模板处理文件内容生成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
{
    final File stackDirFile = new File("/tmp/" + info.getName() + "/" + info.getVersion());
    stackDirFile.mkdirs();
 
    StackMetainfoXml stackMetainfoXml = new StackMetainfoXml();
    stackMetainfoXml.setValid(info.isValid());
//    stackMetainfoXml.setExtendsVersion(info.getParentStackVersion());
//    stackMetainfoXml.setMinJdk(info.getMinJdk());
//    stackMetainfoXml.setMaxJdk(info.getMaxJdk());
//    stackMetainfoXml.setVersion(info.getVersion());
    //
    {
      {
        String kerberosDescriptorFileLocation = info.getKerberosDescriptorFileLocation();
        if (kerberosDescriptorFileLocation != null) {
          File file = new File(kerberosDescriptorFileLocation);
          Files.copy(file, new File(stackDirFile.getAbsolutePath(), FilenameUtils.getName(file.getAbsolutePath())));
        }
      }
      {
        String kerberosDescriptorPreConfigurationFileLocation = info.getKerberosDescriptorPreConfigurationFileLocation();
        if (kerberosDescriptorPreConfigurationFileLocation != null) {
          File file = new File(kerberosDescriptorPreConfigurationFileLocation);
          Files.copy(file, new File(stackDirFile.getAbsolutePath(), FilenameUtils.getName(file.getAbsolutePath())));
        }
      }
      {
        String widgetsDescriptorFileLocation = info.getWidgetsDescriptorFileLocation();
        if (widgetsDescriptorFileLocation != null) {
          File file = new File(widgetsDescriptorFileLocation);
          Files.copy(file, new File(stackDirFile.getAbsolutePath(), FilenameUtils.getName(file.getAbsolutePath())));
        }
      }
      {
        StackRoleCommandOrder roleCommandOrder = info.getRoleCommandOrder();
        String value = new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(roleCommandOrder.getContent());
        Files.write(value, new File(stackDirFile.getAbsolutePath(), "role_command_order.json"), Charset.forName("UTF-8"));
      }
    }
 
    // service
    Collection serviceInfos = info.getServices();
    for (ServiceInfo serviceInfo : serviceInfos) {
      String name = serviceInfo.getName();
      File serviceDir = new File(stackDirFile.getAbsolutePath() + "/" + "services", name);
      serviceDir.mkdirs();
 
      {
        File kerberosDescriptorFile = serviceInfo.getKerberosDescriptorFile();
        File widgetsDescriptorFile = serviceInfo.getWidgetsDescriptorFile();
        File metricsFile = serviceInfo.getMetricsFile();
 
        if (kerberosDescriptorFile != null) {
          Files.copy(kerberosDescriptorFile, new File(serviceDir.getAbsolutePath(), FilenameUtils.getName(kerberosDescriptorFile.getAbsolutePath())));
        }
        if (widgetsDescriptorFile != null) {
          Files.copy(widgetsDescriptorFile, new File(serviceDir.getAbsolutePath(), serviceInfo.getWidgetsFileName()));
        }
        if (metricsFile != null) {
          Files.copy(metricsFile, new File(serviceDir.getAbsolutePath(), serviceInfo.getMetricsFileName()));
        }
        {
          StackRoleCommandOrder roleCommandOrder = info.getRoleCommandOrder();
          String value = new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(roleCommandOrder.getContent());
          Files.write(value, new File(serviceDir.getAbsolutePath(), "role_command_order.json"), Charset.forName("UTF-8"));
        }
      }
 
      if (serviceInfo.getQuickLinksConfigurationsMap() != null &amp;&amp; !serviceInfo.getQuickLinksConfigurationsMap().isEmpty()) {
        File dir = new File(serviceDir.getAbsolutePath(), serviceInfo.getQuickLinksConfigurationsDir());
        dir.mkdirs();
 
        List infos = Lists.newArrayList();
        for (Map.Entry<String, QuickLinksConfigurationInfo> entry : serviceInfo.getQuickLinksConfigurationsMap().entrySet()) {
          infos.add(entry.getValue());
 
          ObjectMapper objectMapper = new ObjectMapper();
          ObjectWriter writer = objectMapper.writerWithDefaultPrettyPrinter();
 
          QuickLinksConfigurationInfo configurationInfo = entry.getValue();
          QuickLinks quickLinks = configurationInfo.getQuickLinksConfigurationMap().values().iterator().next();
          String value = writer.writeValueAsString(quickLinks);
          Files.write(value, new File(dir.getAbsolutePath(), entry.getKey()), Charset.forName("UTF-8"));
        }
        serviceInfo.setQuickLinksConfigurations(infos);
      }
 
      if (serviceInfo.getThemesMap() != null &amp;&amp; !serviceInfo.getThemesMap().isEmpty()) {
        File themeDir = new File(serviceDir.getAbsolutePath(), serviceInfo.getThemesDir());
        themeDir.mkdirs();
 
        List themes = Lists.newArrayList();
        for (Map.Entry<String, ThemeInfo> entry : serviceInfo.getThemesMap().entrySet()) {
          themes.add(entry.getValue());
 
          ObjectMapper objectMapper = new ObjectMapper();
          ObjectWriter writer = objectMapper.writerWithDefaultPrettyPrinter();
 
          ThemeInfo themeInfo = entry.getValue();
          Theme theme = themeInfo.getThemeMap().values().iterator().next();
          String value = writer.writeValueAsString(theme);
          Files.write(value, new File(themeDir.getAbsolutePath(), entry.getKey()), Charset.forName("UTF-8"));
        }
        serviceInfo.setThemes(themes);
      }
 
      // 生成configuration配置文件
      List properties = serviceInfo.getProperties();
      Set pFileNames = Sets.newLinkedHashSet();
      Set pFileSortNames = Sets.newLinkedHashSet();
      Map<String, List> configurationFiles = Maps.newLinkedHashMap();
      // configuration
      for (PropertyInfo propertyInfo : properties) {
        String filename = propertyInfo.getFilename();
        pFileNames.add(filename);
        pFileSortNames.add(FilenameUtils.getBaseName(filename));
 
        List propertyInfos = configurationFiles.get(filename);
        if (propertyInfos == null) {
          configurationFiles.put(filename, new ArrayList());
          propertyInfos = configurationFiles.get(filename);
        }
        propertyInfos.add(propertyInfo);
      }
      if (configurationFiles.size() > 0) {
        File configDir = new File(serviceDir.getAbsolutePath() + "/configuration");
        configDir.mkdirs();
 
        for (Map.Entry<String, List> entry : configurationFiles.entrySet()) {
          String fileName = entry.getKey();
          String shortName = FilenameUtils.getBaseName(fileName);
          {// configuration
            ConfigurationXml configurationXml = new ConfigurationXml();
            configurationXml.addAll(entry.getValue());
            Map<QName, String> attri = Maps.newLinkedHashMap();
//            attri.put(new QName("configuration.xsl", "text/xsl", "kkkk"), "yyyy");
            // configuration xmlns:kkkk="configuration.xsl" kkkk:text/xsl="yyyy">
            Map<String, String> supports = serviceInfo.getConfigTypeAttributes().get(shortName).get("supports");
 
            attri.put(new QName(null, "supports_adding_forbidden", ""), supports.get("adding_forbidden"));
            attri.put(new QName(null, "supports_final", ""), supports.get("final"));
            attri.put(new QName(null, "supports_do_not_extend", ""), supports.get("do_not_extend"));
            configurationXml.addAll(attri);
            FileOutputStream stream = null;
            try {
              stream = new FileOutputStream(new File(configDir.getAbsolutePath(), fileName));
              JAXBContext jc = JAXBContext.newInstance(ConfigurationXml.class);
              Marshaller ms = jc.createMarshaller();
              ms.setProperty(Marshaller.JAXB_ENCODING, "UTF-8");//编码格式
              ms.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
              ms.setProperty(Marshaller.JAXB_FRAGMENT, false);
              ms.setProperty("com.sun.xml.bind.xmlHeaders", "\n<!--?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?-->\n" +
                "<!--\n" + "/**\n" + " * Licensed to the Apache Software Foundation (ASF) under one\n" + " * or more contributor license agreements. See the NOTICE file\n" + " * distributed with this work for additional information\n" + " * regarding copyright ownership. The ASF licenses this file\n" + " * to you under the Apache License, Version 2.0 (the\n" + " * \"License\"); you may not use this file except in compliance\n" + " * with the License. You may obtain a copy of the License at\n" + " *\n" + " * http://www.apache.org/licenses/LICENSE-2.0\n" + " *\n" + " * Unless required by applicable law or agreed to in writing, software\n" + " * distributed under the License is distributed on an \"AS IS\" BASIS,\n" + " * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" + " * See the License for the specific language governing permissions and\n" + " * limitations under the License.\n" + " */\n" + "-->\n");
 
              ms.marshal(configurationXml, stream);
            } catch (JAXBException e) {
//        e.printStackTrace();
              new File(stackDirFile.getAbsolutePath() + "/" + "configuration_0").mkdirs();
              FileOutputStream outputStream = new FileOutputStream(new File(stackDirFile.getAbsolutePath() + "/" + "configuration_0", name));
              e.printStackTrace(new PrintStream(outputStream, true));
              outputStream.close();
            } finally {
              Closeables.close(stream, true);
            }
          }
//          {
//            // configuration Deprecate
//            VelocityContext context = new VelocityContext();
//            List propertyInfos = entry.getValue();
//            propertyInfos = Lists.transform(propertyInfos, new Function<PropertyInfo, PropertyInfo>() {
//              public PropertyInfo apply(PropertyInfo input) {
//                String value = input.getValue();
//                if (value != null &amp;&amp; (value.contains("<") || value.contains(">") || value.contains("/") || value.length() > 1024)) {
//                  input.setValue(String.format("<![CDATA[%s]]>", value));
//                }
//                return input;
//              }
//            });
//            context.put("properties", propertyInfos);
//            context.put("configType", serviceInfo.getConfigTypeAttributes().get(shortName));
//            Template template = VelocityUtil.getTemplateByName("configuration");
//            FileWriter writer = new FileWriter(new File(serviceDir.getAbsolutePath() + "/configuration", fileName));
//            template.merge(context, writer);
//            writer.flush();
//            writer.close();
//          }
        }
      }
//      FileWriter fileWriter = new FileWriter("/tmp/HDP/server_" + serviceInfo.getName());
//      GsonBuilder gsonBuilder = new GsonBuilder().setPrettyPrinting();
//      fileWriter.write(gsonBuilder.create().toJson(serviceInfo));
//      fileWriter.close();
      // 生成metainfo.xml
      FileOutputStream stream = null;
      try {
        JAXBContext jc = JAXBContext.newInstance(ServiceMetainfoXml.class);
        Marshaller ms = jc.createMarshaller();
        ms.setProperty(Marshaller.JAXB_ENCODING, "UTF-8");//编码格式
        ms.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
        ms.setProperty(Marshaller.JAXB_FRAGMENT, false);
        ms.setProperty("com.sun.xml.bind.xmlHeaders", "\n\n" +
          "<!--\n" + " Licensed to the Apache Software Foundation (ASF) under one or more\n" + " contributor license agreements. See the NOTICE file distributed with\n" + " this work for additional information regarding copyright ownership.\n" + " The ASF licenses this file to You under the Apache License, Version 2.0\n" + " (the \"License\"); you may not use this file except in compliance with\n" + " the License. You may obtain a copy of the License at\n" + "\n" + " http://www.apache.org/licenses/LICENSE-2.0\n" + "\n" + " Unless required by applicable law or agreed to in writing, software\n" + " distributed under the License is distributed on an \"AS IS\" BASIS,\n" + " WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" + " See the License for the specific language governing permissions and\n" + " limitations under the License.\n" + "-->\n");
 
        ServiceMetainfoXml serviceMetainfoXml = new ServiceMetainfoXml();
        serviceMetainfoXml.setServices(Lists.newArrayList(serviceInfo));
        serviceMetainfoXml.setSchemaVersion("2.0");
        serviceMetainfoXml.setValid(serviceInfo.isValid());
 
        stream = new FileOutputStream(new File(serviceDir.getAbsolutePath(), "metainfo.xml"));
        ms.marshal(serviceMetainfoXml, stream);
      } catch (JAXBException e) {
//        e.printStackTrace();
        new File(stackDirFile.getAbsolutePath() + "/" + "services_2").mkdirs();
        FileOutputStream outputStream = new FileOutputStream(new File(stackDirFile.getAbsolutePath() + "/" + "services_2", name));
        e.printStackTrace(new PrintStream(outputStream, true));
        outputStream.close();
 
        {// @Deprecate
          // meteinfo.xml
          VelocityContext context = new VelocityContext();
          context.put("serviceInfo", serviceInfo);
          context.put("pFileNames", pFileNames);
          context.put("pFileSortNames", pFileSortNames);
          Template template = VelocityUtil.getTemplateByName("metainfo");
          FileWriter writer = new FileWriter(new File(serviceDir.getAbsolutePath(), "metainfo.xml"));
          template.merge(context, writer);
          writer.flush();
          writer.close();
        }
      } finally {
        Closeables.close(stream, true);
      }
    }
  }

Velocity模板类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public final class VelocityUtil {
  private static final Logger LOGGER = LoggerFactory.getLogger(VelocityUtil.class);
 
  private VelocityUtil() {/**/}
 
  private static final VelocityEngine VELOCITY_ENGINE;
 
  static {
    VELOCITY_ENGINE = new VelocityEngine();
    VELOCITY_ENGINE.setProperty(RuntimeConstants.RESOURCE_LOADER, "classpath");
    VELOCITY_ENGINE.setProperty("userdirective", "org.apache.ambari.server.utils.Ifnull");
    VELOCITY_ENGINE.setProperty("userdirective", "org.apache.ambari.server.utils.Ifnotnull");
    VELOCITY_ENGINE.setProperty("classpath.resource.loader.class", ClasspathResourceLoader.class.getName());
    VELOCITY_ENGINE.init();
  }
 
  public static Template getTemplateByName(String templateName) {
    return VELOCITY_ENGINE.getTemplate("velocity/" + templateName + ".vm", "utf8");
  }
 
  public static String getContentByTemplate(String templateName, Map<String, Object> datas) {
    try {
      Template template = VELOCITY_ENGINE.getTemplate("velocity/" + templateName + ".vm", "utf8");
      VelocityContext context = new VelocityContext();
      for (Map.Entry<String, Object> data : datas.entrySet()) {
        context.put(data.getKey(), data.getValue());
      }
      StringWriter writer = new StringWriter();
      template.merge(context, writer);
      return writer.toString();
    } catch (Exception e) {
      LOGGER.error("对应模板文件未找到", e);
    }
    return null;
  }
 
}

模板 configuration.vm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
<!--?xml version="1.0"?-->
<!--?xml-stylesheet type="text/xsl" href="configuration.xsl"?-->
<!-- /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ -->
#macro(output $__p_page, $box)
    #ifnotnull($__p_page)
        #if($__p_page != "")
            <$!{box}>$__p_page
        #end
    #end
#end
 
 
#foreach($entry in $properties)
 
        $!{entry.name}
        $!{entry.value}
        $!{entry.description}
        $!{entry.displayName}
        $!{entry.filename}
        $!{entry.deleted}
        $!{entry.deleted}
 
        #ifnotnull($entry.propertyAmbariUpgradeBehavior)
 
        #end
        #if($entry.propertyTypes.size()>0)
            #foreach($ept in $entry.propertyTypes)$!{ept}#end
        #end
        #ifnotnull($entry.propertyValueAttributes)
 
            #output("$!{entry.propertyValueAttributes.type}", "type")
            #output("$!{entry.propertyValueAttributes.minimum}", "minimum")
            #output("$!{entry.propertyValueAttributes.maximum}", "maximum")
            #output("$!{entry.propertyValueAttributes.unit}", "unit")
            #output("$!{entry.propertyValueAttributes.delete}", "delete")
            #output("$!{entry.propertyValueAttributes.visible}", "visible")
            #output("$!{entry.propertyValueAttributes.overridable}", "overridable")
            #output("$!{entry.propertyValueAttributes.copy}", "copy")
            #output("$!{entry.propertyValueAttributes.emptyValueValid}", "empty-value-valid")
            #output("$!{entry.propertyValueAttributes.uiOnlyProperty}", "ui-only-property")
            #output("$!{entry.propertyValueAttributes.readOnly}", "readOnly")
            #output("$!{entry.propertyValueAttributes.editableOnlyAtInstall}", "editable-only-at-install")
            #output("$!{entry.propertyValueAttributes.showPropertyName}", "show-property-name")
            #output("$!{entry.propertyValueAttributes.incrementStep}", "increment-step")
            #output("$!{entry.propertyValueAttributes.keyStore}", "keyStore")
 
            #if($entry.propertyValueAttributes.entries.size()>0)
 
                #foreach($pvae in $entry.propertyValueAttributes.entries)
 
                        #output("$!{pvae.value}", "value")
                        #output("$!{pvae.label}", "label")
                        #output("$!{pvae.description}", "description")
 
                #end
 
            #end
            #output("$!{entry.propertyValueAttributes.hidden}", "hidden")
            #output("$!{entry.propertyValueAttributes.entriesEditable}", "entries_editable")
            #output("$!{entry.propertyValueAttributes.selectionCardinality}", "selection-cardinality")
            #output("$!{entry.propertyValueAttributes.propertyFileName}", "property-file-name")
            #output("$!{entry.propertyValueAttributes.propertyFileType}", "property-file-type")
            #output("$!{entry.propertyValueAttributes.propertyFileType}", "property-file-type")
            #if($entry.propertyValueAttributes.userGroupEntries.size()>0)
 
                #foreach($pvae in $entry.propertyValueAttributes.userGroupEntries)
 
                        #output("$!{pvae.value}", "value")
                        #output("$!{pvae.label}", "label")
 
                #end
 
            #end
 
        #end
        #if($entry.dependsOnProperties.size()>0)
 
            #foreach($edop in $entry.dependsOnProperties)
 
                    #output("$!{pvae.name}", "name")
                    #output("$!{pvae.type}", "type")
 
            #end
 
        #end
        #if($entry.dependedByProperties.size()>0)
 
            #foreach($edop in $entry.dependedByProperties)
 
                    #output("$!{pvae.name}", "name")
                    #output("$!{pvae.type}", "type")
 
            #end
 
        #end
        #if($entry.usedByProperties.size()>0)
 
            #foreach($edop in $entry.usedByProperties)
 
                    #output("$!{pvae.name}", "name")
                    #output("$!{pvae.type}", "type")
 
            #end
 
        #end
 
#end

模板 metainfo.vm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
<!--?xml version="1.0"?-->
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -->
#macro(output $__p_page, $box)
    #ifnotnull($__p_page)
        #if($__p_page != "")
        <$!{box}>$__p_page
        #end
    #end
#end
#set($service = "service")
#set($services = "services")
 
    $!{serviceInfo.schemaVersion}
    <${services}>
        <${service}>
            #output("$!{serviceInfo.name}", "name")
            #output("$!{serviceInfo.displayName}", "displayName")
            #output("$!{serviceInfo.version}", "version")
            #output("$!{serviceInfo.comment}", "comment")
            #output("$!{serviceInfo.serviceType}", "serviceType")
            #output("$!{serviceInfo.selection}", "selection")
 
            #ifnotnull($serviceInfo.credentialStoreInfo)
 
                #output("$!{serviceInfo.credentialStoreInfo.supported}", "supported")
                #output("$!{serviceInfo.credentialStoreInfo.enabled}", "enabled")
                #output("$!{serviceInfo.credentialStoreInfo.required}", "required")
 
            #end
 
            #if($serviceInfo.components.size()>0)
 
                    #foreach($entry in $serviceInfo.components)
 
                            #output("$!{entry.name}", "name")
                            #output("$!{entry.displayName}", "displayName")
                            #output("$!{entry.category}", "category")
                            #output("$!{entry.deleted}", "deleted")
                            #output("$!{entry.cardinality}", "cardinality")
                            #output("$!{entry.versionAdvertisedField}", "versionAdvertised")
                            #output("$!{entry.versionAdvertisedInternal}", "versionAdvertisedInternal")
                            #output("$!{entry.unlimitedKeyJCERequired}", "unlimitedKeyJCERequired")
                            #output("$!{entry.rollingRestartSupported}", "rollingRestartSupported")
 
                            #ifnotnull($entry.commandScript)
 
                                    #output("$!{entry.commandScript.script}", "script")
                                    #output("$!{entry.commandScript.scriptType}", "scriptType")
                                    #output("$!{entry.commandScript.timeout}", "timeout")
 
                            #end
                            #if($entry.logs.size()>0)
 
                                    #foreach($log in $entry.logs)
 
                                            #output("$!{log.logId}", "logId")
                                            #output("$!{log.primary}", "primary")
 
                                    #end
 
                            #end
                            #if($entry.clientsToUpdateConfigs.size()>0)
 
                                    #foreach($client in $entry.clientsToUpdateConfigs)
                                        #output("$!{client}", "client")
                                    #end
 
                            #end
                            #if($entry.configFiles.size()>0)
 
                                    #foreach($configFile in $entry.configFiles)
 
                                            #output("$!{configFile.type}", "type")
                                            #output("$!{configFile.fileName}", "fileName")
                                            #output("$!{configFile.dictionaryName}", "dictionaryName")
                                            #output("$!{configFile.optional}", "optional")
 
                                    #end
 
                            #end
                            #if($entry.customCommands.size()>0)
 
                                    #foreach($customCommand in $entry.customCommands)
 
                                            #output("$!{customCommand.name}", "name")
                                            #output("$!{customCommand.background}", "background")
 
                                            #ifnotnull($customCommand.commandScript)
 
                                                #output("$!{customCommand.commandScript.script}", "script")
                                                #output("$!{customCommand.commandScript.scriptType}", "scriptType")
                                                #output("$!{customCommand.commandScript.timeout}", "timeout")
 
                                            #end
 
                                    #end
 
                            #end
 
                            #ifnotnull($entry.bulkCommandDefinition)
 
                                    #output("$!{entry.bulkCommandDefinition.displayName}", "displayName")
                                    #output("$!{entry.bulkCommandDefinition.masterComponent}", "masterComponent")
 
                            #end
 
                            #if($entry.dependencies.size()>0)
 
                                    #foreach($dependency in $entry.dependencies)
                                        #output("$!{dependency.name}", "name")
                                        #output("$!{dependency.scope}", "scope")
                                        #output("$!{dependency.serviceName}", "serviceName")
                                        #output("$!{dependency.componentName}", "componentName")
                                        #ifnotnull($dependency.autoDeploy)
 
                                            #output("$!{dependency.autoDeploy.m_enabled}", "enabled")
                                            #output("$!{dependency.autoDeploy.m_coLocate}", "co-locate")
 
                                        #end
                                    #end
 
                            #end
                            #if($entry.configDependencies.size()>0)
 
                                    #foreach($dependency in $entry.configDependencies)
                                        #output("$!{dependency}", "config-type")
                                    #end
 
                            #end
 
                            #ifnotnull($entry.autoDeploy)
 
                                #output("$!{entry.autoDeploy.m_enabled}", "extended")
                                #output("$!{entry.autoDeploy.m_coLocate}", "co-locate")
 
                            #end
 
                            #output("$!{entry.recoveryEnabled}", "recovery_enabled")
                            #output("$!{entry.reassignAllowed}", "reassignAllowed")
                            #output("$!{entry.timelineAppid}", "timelineAppid")
                            #output("$!{entry.customFolder}", "customFolder")
 
                    #end
 
            #end
            #output("$!{serviceInfo.parent}", "extends")
            #output("$!{serviceInfo.widgetsFileName}", "widgetsFileName")
            #output("$!{serviceInfo.metricsFileName}", "metricsFileName")
 
            #output("$!{serviceInfo.isDeleted}", "deleted")
 
            #if($serviceInfo.configLayout.size()>0)
 
                    #foreach($layout in $serviceInfo.configLayout.entrySet())
                    <$!{layout.key}>
                        $!{layout.value}
 
                    #end
 
            #end
            #if($serviceInfo.configDependencies.size()>0)
 
                    #foreach($depend in $serviceInfo.configDependencies)
                        #output("$!{depend}", "config-type")
                    #end
 
            #end
            #if($serviceInfo.excludedConfigTypes.size()>0)
 
                    #foreach($depend in $serviceInfo.excludedConfigTypes)
                        #output("$!{depend}", "config-type")
                    #end
 
            #end
            #if($serviceInfo.configTypes.size()>0)
 
                    #foreach($ctype in $serviceInfo.configTypes.entrySet())
                        <$!{ctype.key}>
                            #foreach($cv in $ctype.value.entrySet())
                                <$!{cv.key}>$!{cv.value}
                            #end
 
                    #end
 
            #end
 
##            #if($pFileSortNames.size()>0) // TODO: xxx
##                
##                    #foreach($name in $pFileSortNames)
##                        #output("$!{name}", "config-type")
##                    #end
##                
##            #end
##
            #if($serviceInfo.requiredServices.size()>0)
 
                #foreach($name in $serviceInfo.requiredServices)
                    #output("$!{name}", "service")
                #end
 
            #end
##
            #if($serviceInfo.themesMap.size()>0)
 
                #foreach($entry in $serviceInfo.themesMap.entrySet())
 
                        #output("$!{entry.key}", "fileName")
                        #output("$!{entry.value.isDefault}", "default")
                        #output("$!{entry.value.deleted}", "deleted")
 
                #end
 
            #end
 
    #if($serviceInfo.themesMap.size()>0)
 
            #foreach($entry in $serviceInfo.themesMap.entrySet())
 
                    #output("$!{entry.key}", "fileName")
                        #output("$!{entry.value.isDefault}", "default")
                        #output("$!{entry.value.deleted}", "deleted")
 
            #end
 
    #end
未分类

Sqoop源码分析

Sqoop的Mysql数据导出实现分两种,一种是使用JDBC方式从Mysql中获取数据,一种是使用MysqlDump命令从MySql中获取数据,默认是 JDBC方式获取数据,如果要使用dump方式获取数据,需要添加 -direct 参数。

使用JDBC方式从Mysql中获取数据

配置语句时,需要添加 $CONDITIONS 点位符,比如:SELECT id FROM user WHERE $CONDITIONS,Sqoop在内部实现时会把它替换成需要的查询条件。

Sqoop启动后会先查询元数据,它会把 $CONDITIONS 替换为 (1=0) ,然后用得到的SQL语句查询数据表对应的Meta信息对于导出一个表的情况,Sqoop会使用这个SQL查询三次数据库,分别是: 1、获取 colInfo(最终得到columnTypes信息)2、查询ColumnNames信息3、生成QueryResult类执行 generateFields操作获取columnTypeNames时。

Sqoop会对获取的Fields做校验,列不能重复,它还会处理数据库的字段到Java属性名的转换

QueryResult类是通过构建java类文件,然后获取JavaCompiler,然后编译加载,为了提高处理性能,不是使用反射实现的,这个生成类内部处理mysql到hdfs属性值为空和分隔符的处理。

接着它会进行下面一个Sql查询操作,查询结果集为MIN(split列),MAX(split列),查询条件的处理逻辑为 $CONDITIONS 替换为(1=1),然后再添加外面SELECT查询 (举例:SELECT MIN(id), MAX(id) FROM (SELECT ID,NAME,PASSPORT WHERE (1=1) ) AS t1 ),这样就查询出来此次导出数据最大的split列值和最小的split列值。

对于为整数、布尔值、时间格式、Float等 的分区列,进行split时直接根据对应值的大小进行Split,Text文本的处理方式比较特殊,Sqoop先会对之前获取到的Min和Max的字串寻找它们最大的相同前缀子字串,然后把后面的字段转化为BigDecimal,结合时char占两个字节(65536),算法在 TextSplitter类中,比较简单,本质上就是一个进制转换。拆分好后,需要把Split的值再转换为String,然后加上相同前缀子字段,就构成了查询区间了(注意中文可能会被拆分)。

Sqoop对数据的获取是在DataDrivenDBRecordReader中,在查询时会把 $CONDITIONS 替换成 split 的范围比如 ( id >= 1) && (id<10),使用JDBC获取到结果集游标,然后移动游标处理数据。

使用MysqlDump命令从MySql中获取数据

第二种方法与第一种方式有下面的差别:

初始化元数据,它是在构建的查询语句后面添加 limit 1 ,比如:SELECT t. FROM AS t LIMIT 1,因为dump方式在查询指定获取列时使用的是 t.,当使用limit 0时,数据库不会给它返回必须的元数据信息。

dump方式在map进行数据的获取,其会构建mysqldump命令,然后使用java程序调用,获取输入输出流和错误流,其实现了 org.apache.sqoop.util.AsyncSink 抽象类,用来处理输入输出流和错误流。

优化策略:

Sqoop查询无数据会进行三次相同的Sql查询,可以合并查询,不过由于仅返回Meta信息,查询很快,不需要修改实现。

分区列选择对于查询元数据和导出的查询影响很大,应该对索引做调优,避免对分区列的排序操作,加快元数据查询速度和导出数据的速度,尽量选择自增加的主键ID做Split列,区分度好并且可以顺序读取数据。

导出操作的查询语句中,$CONDITIONS 会被替换为范围区间,创建索引时,要考虑做这个查询的优化。

索引建议,考虑三个规则(使查询数据集较少、减少点的查询、避免排序操作),Sqoop场景下,如果分区列不是主键(自增加)时,把分区列做为联合索引的第一个字段,其它被选择的查询条件做为索引的其它字段。

分区列的选择,要避免Split后数据不均衡。

从实现上来看-m参数是可以增加任务的并行度的,但数据库的读线程是一定的,所以 -m 过大对于数据库会是一个压力,在Sqoop的场景下,数据库是一个影响并发的瓶颈。增加job数意义不大。

下面列出Sqoop目前1.4.6版本存在的两个问题。

查看Sqoop源码,发现其存在两个比较严重的问题。

问题 1、数据分片与Mapper设定不合理

Sqoop在抽取时可以指定 -m 的参数,但这个 -m 的参数是控制mapper的数量的,但它也决定了最后能够生成的文件的数目,调节这个值可以实现对结果文件大小的控制,但问题是,如果产生的文件的格式不能够被分割,那么对这个数据的下游性能有很大影响,同时Sqoop在启动时会启动 -m 个 MapperTask,会对数据库产生m的并发读取。个人认为Sqoop做为一个数据导出框架,对数据的控制应该再细至一些,-m 只是控制MR的并行度,而数据分片数目应该由另外的参数控制。

个人建议可以加个 -split-per-map 参数,比如设置-m=4 -split-per-map=2,则对结果集分 8 片,每个Mapper处理两片数据,最后共产生 8 个文件。

问题 2、分片效率低

Sqoop在做分片处理时有问题,其实现会使用 “Select Max(splitKey),Min(splitKey) From ( –select参数 ) as t1” 的语句来查询分片信息,在Mysql下,这样的查询会产生一个以split-id为主键,包含需要导出的其它所有字段的临时表,如果数据量不大,临时表数据可以在内存中,处理速度还可以保证。但如果数据量很大,内存中已经存放不下时,这些数据会被保存为MyISAM表存放到磁盘文件中,如果数据量再大一些,磁盘文件已经存放不下临时表时,拆分数据会失败。如果数据量很大,这个查询可以占到整个导出时间的45%,优化空间很大,如果不修改实现的话,不适合做大数据量表的全量数据导出操作。

解决方案一:

修改所有导出脚本,分片语句自定义

解决方案二:

修改:org.apache.sqoop.mapreduce.DataDrivenImportJob的

1
2
@Contract(null, _ -&gt; !null)
private String buildBoundaryQuery(String col, String query)

修改代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
org.apache.sqoop.mapreduce.DataDrivenImportJob的
 
@Contract("null, _ -> !null") 
private String buildBoundaryQuery(String col, String query)
 
/**
   * Build the boundary query for the column of the result set created by
   * the given query.
   * @param col column name whose boundaries we're interested in.
   * @param query sub-query used to create the result set.
   * @return input boundary query as a string
   */
  private String buildBoundaryQuery(String col, String query) {
    if (col == null || options.getNumMappers() == 1) {
      return "";
    }
 
    // Replace table name with alias 't1' if column name is a fully
    // qualified name.  This is needed because "tableName"."columnName"
    // in the input boundary query causes a SQL syntax error in most dbs
    // including Oracle and MySQL.
    String alias = "t1";
    int dot = col.lastIndexOf('.');
    String qualifiedName = (dot == -1) ? col : alias + col.substring(dot);
 
    ConnManager mgr = getContext().getConnManager();
    String ret = mgr.getInputBoundsQuery(qualifiedName, query);
    if (ret != null) {
      return ret;
    }
 
//    return "SELECT MIN(" + qualifiedName + "), MAX(" + qualifiedName + ") "
//        + "FROM (" + query + ") AS " + alias;
    return initBoundaryQuery(qualifiedName, query, alias);
  }
 
  private String initBoundaryQuery(String qualifiedName, String query, String alias) {
    StringBuilder regex = new StringBuilder();
    regex.append("(\\s[A|a][S|s][\\s][`]?");
    for (char c : qualifiedName.toCharArray()) {
      regex.append('[').append(c).append(']');
    }
    regex.append("[`|\\s|,])");
    final Matcher matcher1 = Pattern.compile(regex.toString()).matcher(query);
    final boolean asCheckOk = !matcher1.find();
    if(asCheckOk) {
      final Matcher matcher2 = Pattern.compile("(\\s[F|f][R|r][O|o][M|m]\\s)").matcher(query);
      int count = 0;
      while (matcher2.find()) {
        count++;
      }
      boolean fromCheckOk = count == 1;
      if(fromCheckOk) {
        final Matcher matcher = Pattern.compile("(\\s[F|f][R|r][O|o][M|m]\\s[\\s\\S]*)").matcher(query);
        while (matcher.find()) {
          return "SELECT MIN(" + qualifiedName + "), MAX(" + qualifiedName + ") "
                  + matcher.group();
        }
      }
    }
    return "SELECT MIN(" + qualifiedName + "), MAX(" + qualifiedName + ") "
            + "FROM (" + query + ") AS " + alias;
  }

问题三:无法自定义特殊字符替换

解决方案一:

通过SQL的替换功能,修改脚本代价高,并且脚本可读性、可维护性都大大降低

解决文案二:

修改Sqoop实现,增加自定义特殊字符替换功能