博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
storm shell命令源码分析-shell_submission.clj
阅读量:6655 次
发布时间:2019-06-25

本文共 6032 字,大约阅读时间需要 20 分钟。

  hot3.png

当我们在shell里执行storm shell命令是会调用shell_submission.clj里的main函数。shell_submission.clj如下:

shell_submission.clj

;; ns函数声明一个命名空间“backtype.storm.command.shell-submission”(ns backtype.storm.command.shell-submission  ;; :import引用backtype.storm中的StormSubmitter类  (:import [backtype.storm StormSubmitter])  ;; :use引用backtype.storm thrift命名空间中的thrift、util、config和log,并建立连接,这样调用thrift、util、config和log中函数就可以直接使用函数名称不需要加完全限定名  (:use [backtype.storm thrift util config log])  ;; :require引用clojure.string,并使用别名str代替完全限定名clojure.string  (:require [clojure.string :as str])  ;; :gen-class生成java类  (:gen-class))  ;; storm shell命令所执行的main函数,gen-class的默认前缀"-",-main函数可以看成public函数,^String是类型提示符,用于声明参数tmpjarpath是一个字符串,-main函数可以接受多个实参,第一个参数赋值给tmpjarpath,其他参数全部保存在args中,args一个"序列"  (defn -main [^String tmpjarpath & args]  ;; conf绑定集群配置信息map,read-storm-config函数定义在backtype.storm.config命令空间,用于读取集群配置信息,返回包含集群配置信息的map,read-storm-config函数参见其定义部分  (let [conf (read-storm-config)        ;; 从集群配置信息中获取nimbus主机        host (conf NIMBUS-HOST)        ;; 从集群配置信息中获取nimbus thrift server的端口        port (conf NIMBUS-THRIFT-PORT)        ;; 调用StormSubmitter类的静态方法submitJar,将tmpjarpath所标识的jar文件上传到nimbus服务器上,jarpath保存jar文件在nimbus服务器上的路径,submitJar方法参见其定义部分        jarpath (StormSubmitter/submitJar conf tmpjarpath)        ;; concat函数将[host port jarpath]和args进行合并,并保存在args中        args (concat args [host port jarpath])];; str/join将args中的参数用空格进行连接后,作为参数传递给exec-command!函数,执行jar文件中的main方法(exec-command! (str/join " " args))))

当Clojure源文件做为脚本执行时,它们将在运行时被编译为java字节码。它们同样可以提前编译为java字节码(AOT编译)。这改善了Clojure应用的启动时间,并生产了可以运用于java中的.class文件。如果编译过的命名空间中拥有一个叫做-main的函数,那么它就能够作为一个Java应用运行。命令行参数会作为参数传递给这个函数。

read-storm-config函数

;; read-storm-config函数调用Utils类的readStormConfig方法读取集群配置信息,并过滤掉信息中非法配置(defn read-storm-config  []  ;; readStormConfig方法参见其定义部分  (let [conf (clojurify-structure (Utils/readStormConfig))]    ;; 调用validate-configs-with-schemas函数验证配置信息的正确性并删除不正确的配置信息    (validate-configs-with-schemas conf)    conf))

readStormConfig方法

public static Map readStormConfig() {    // 调用readDefaultConfig方法从defaults.yaml配置文件读取集群默认配置信息存入一个map对象ret中    Map ret = readDefaultConfig();    // confFile保存系统变量"storm.conf.file"的值,系统变量"storm.conf.file"保存了用户自定义配置文件的路径    String confFile = System.getProperty("storm.conf.file");    Map storm;    // 如果没有用户自定义配置文件,那么调用findAndReadConfigFile方法读取"storm.yaml"配置文件,将配置信息保存在storm中,否则读取用户自定义配置文件    if (confFile==null || confFile.equals("")) {        storm = findAndReadConfigFile("storm.yaml", false);    } else {        storm = findAndReadConfigFile(confFile, true);    }    // 将"storm.yaml"配置文件或用户自定义的配置文件信息覆盖添加到默认配置信息ret中    ret.putAll(storm);    // 读取命令行提供的配置信息,并覆盖添加到之前的map对象中    ret.putAll(readCommandLineOpts());    // 返回保存了配置信息的map对象    return ret;}

submitJar方法

submitJar方法调用了StormSubmitter类的重载方法submitJar

/** * Submit jar file * @param conf the topology-specific configuration. See {@link Config}. * @param localJar file path of the jar file to submit * @return the remote location of the submitted jar */public static String submitJar(Map conf, String localJar) {    return submitJar(conf, localJar, null);}

重载方法submitJar

submitJar通过thrift client调用nimbus thrift server中的beginFileUpload函数获取目标路径,然后将jar上传到nimbus的目标路径上

/** * Submit jar file * @param conf the topology-specific configuration. See {@link Config}. * @param localJar file path of the jar file to submit * @param listener progress listener to track the jar file upload * @return the remote location of the submitted jar */public static String submitJar(Map conf, String localJar, ProgressListener listener) {    if (localJar == null) {        throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");    }            // 创建nimbus client    NimbusClient client = NimbusClient.getConfiguredClient(conf);    try {            // 调用nimbus thrift server中的beginFileUpload函数获取目标路径,beginFileUpload函数参见其定义部分        String uploadLocation = client.getClient().beginFileUpload();        LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);        BufferFileInputStream is = new BufferFileInputStream(localJar, THRIFT_CHUNK_SIZE_BYTES);        long totalSize = new File(localJar).length();        if (listener != null) {            listener.onStart(localJar, uploadLocation, totalSize);        }        long bytesUploaded = 0;        while(true) {            byte[] toSubmit = is.read();            bytesUploaded += toSubmit.length;            if (listener != null) {                listener.onProgress(localJar, uploadLocation, bytesUploaded, totalSize);            }            if(toSubmit.length==0) break;            // 调用nimbus thrift server中的uploadChunk函数将jar文件上传nimbus服务器,uploadChunk函数参见其定义部分            client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));        }        // 调用nimbus thrift server中的finishFileUpload完成jar文件上传,finishFileUpload函数参见其定义部分        client.getClient().finishFileUpload(uploadLocation);        if (listener != null) {            listener.onCompleted(localJar, uploadLocation, totalSize);        }        LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);        // 返回jar文件上传nimbus的路径        return uploadLocation;    } catch(Exception e) {        throw new RuntimeException(e);                } finally {        client.close();    }}

beginFileUpload函数

(beginFileUpload [this]    ;; fileloc就是jar上传到nimbus上的目录"{storm.local.dir}/nimubs/inbox/stormjar-(uuid).jar",storm.local.dir是在配置信息中设置的    (let [fileloc (str (inbox nimbus) "/stormjar-" (uuid) ".jar")]      ;; (:uploaders nimbus)获取nimbus元数据中的TimeCacheMap,关于TimeCacheMap将在以后博客详细分析,将fileloc及其对应的FileOutputStream放入TimeCacheMap      (.put (:uploaders nimbus)            fileloc            (Channels/newChannel (FileOutputStream. fileloc)))      (log-message "Uploading file from client to " fileloc)      ;; 返回上传路径      fileloc      ))

shell_submission.clj就分析到这里了,分析过程只列举了一些重要的函数,还有一些辅助函数没有列出,感兴趣的可以自己查看下。

转载于:https://my.oschina.net/drl/blog/658608

你可能感兴趣的文章
制作一个视频播放器
查看>>
wxPython学习1--创建最小的空的wxPython程序
查看>>
Codeforces Round #503 (by SIS, Div. 2)B 1020B Badge (拓扑)
查看>>
CODING常见错误原因
查看>>
一个sql脚本引发的灾难后的思索
查看>>
single-table inheritance 单表继承
查看>>
SQL Email
查看>>
HDU - 6441(费马大定理)
查看>>
[LintCode] 通配符查询
查看>>
[Algorithms] Longest Common Subsequence
查看>>
java中的sql语句中如果有like怎么写
查看>>
day22-Model创建表补充
查看>>
C++ 类成员函数继承(virtual、非virtual)
查看>>
mysql只navicat
查看>>
HDU-2571-命运(DP)
查看>>
ubuntu下C操作Mysql数据库第一步
查看>>
java的Pattern类
查看>>
递归函数与二分查找算法
查看>>
使用Apache JMeter进行SQL优化性能测试
查看>>
在linux上部署jenkins
查看>>