副标题[/!--empirenews.page--]
?
项目背景:
? ? (1) 已有监控系统采用的OpenTSDB方案
? ? (2) ?目前一些大数据应用,尤其是基于spark streaming的流式应用,会实时计算生成一些指标数据,借用监控系统的存储。
? ? (3) 需要前端展示实时分析结果,采用zeppelin展示方式,但是目前zeppelin不支持OpenTSDB后端引擎支持
? ?So,自己开发!
?
一 Interpreter插件流程
? ??? ? 插播: ? ?刚去访问官方发现0.6.0版本发布了!?http://zeppelin.apache.org/docs/0.6.0/
? ? (1) 下载Zeppelin源码
? ? (2) ?创建Zeppelin Maven工程的 Module
???? 
? ? (3) 添加对zeppelin-interpreter插件包的依赖
???????? 
? ? ? ? ?由于Zeppelin运行环境已经有了该依赖包,所以我们再创建自定义Interpreter插件的时候只需要在代码中对其依赖,打包过程中不需要打包该包。所以使用provided依赖方式。
? ? ?(4) 添加对OpenTSDB客户端操作API包的依赖
???? 
? ? ? ?注意:该包为内部开发依赖包
? ? ? (5) 创建实现类继承Zeppelin提供的抽象类org.apache.zeppelin.interpreter.Interpreter;
? ??????????public class TsdInterpreter extends Interpreter
?
? ? ? ?(6) 代码中注册当前插件
? ? ? ? ? 在实现类中添加以下代码实现当前插件的注册
????? ? ? ??static {
? ? ? ? ? ? ? Interpreter.register("tsd","tsd",TsdInterpreter.class.getName()); ? ? ? ? ? }
? ? ? ? ? 以tsd名称注册,那么Zeppelin前端在调用OpenTSDB查询的时候,只需要指定后端引擎名称%tsd即可。
? ? ? ?(7) 实现核心抽象方法,即Zeppelin前端提交过来的命令
??????????public InterpreterResult interpret(String cmd,InterpreterContext context)
? ? ? ? ? cmd: 即在Zeppelin交互式界面编写的命令,不包含%tsd
? ? ? ? ? context: 当前插件的上下文,主要包含插件的配置信息,例如操作OpenTSDB的时候就需要从上下文中获取OpenTSDB的IP和端口参数。
????????????
????? ? ? 该方法实现的核心思想就是: ? 解析命令=>实例化OpenTSDB操作客户端=>操作OpenTSDB客户端进行数据查询=> 获取返回结果 封装成InterpreterResult对象。
?
????????? ? 贴核心代码吧:
????????????
? ? ? ? ? Properties intpProperty = getProperty(); ? ? ? ? ? for (Object k : intpProperty.keySet()) { ?? ??? ??? ?String key = (String) k; ?? ??? ??? ?String value = (String) intpProperty.get(key);
?? ??? ??? ?if (key.equals("tsd.host") ) { ?? ??? ??? ??? ?host = value; ?? ??? ??? ?} else if (key.equals("tsd.port")) { ?? ??? ??? ??? ?port = value; ?? ??? ??? ?} ?? ??? ?} ? ? ? ? ?propertiesUtil.setOpentsdbIp(host); ? ? ? ? ? propertiesUtil.setPort(Integer.parseInt(port));
? ? ? ? ? ? Scanner scanner = new Scanner(items[1]); ?? ??? ??? ?String start,end,metric,tagsStr; ?? ??? ??? ?if (scanner.hasNext()) ?? ??? ??? ??? ?start = scanner.next(); ?? ??? ??? ?else { ?? ??? ??? ??? ?return processHelp(InterpreterResult.Code.ERROR, ?? ??? ??? ??? ??? ??? ?"1!Please enter the correct format!"); ?? ??? ??? ?} ?? ??? ??? ? ?? ??? ??? ?if (scanner.hasNext()) ?? ??? ??? ??? ?end = scanner.next(); ?? ??? ??? ?else { ?? ??? ??? ??? ?return processHelp(InterpreterResult.Code.ERROR, ?? ??? ??? ??? ??? ??? ?"2!Please enter the correct format!"); ?? ??? ??? ?} ?? ??? ??? ? ?? ??? ??? ?if (scanner.hasNext()) ?? ??? ??? ??? ?metric = scanner.next(); ?? ??? ??? ?else { ?? ??? ??? ??? ?return processHelp(InterpreterResult.Code.ERROR, ?? ??? ??? ??? ??? ??? ?"3!Please enter the correct format!"); ?? ??? ??? ?} ?? ??? ??? ? ?? ??? ??? ?if (scanner.hasNext()) ?? ??? ??? ??? ?tagsStr = scanner.next(); ?? ??? ??? ?else { ?? ??? ??? ??? ?return processHelp(InterpreterResult.Code.ERROR, ?? ??? ??? ??? ??? ??? ?"4!Please enter the correct format!"); ?? ??? ??? ?} ?? ??? ??? ?
?? ??? ??? ?// cpid=tudou,busiid=*,code=1 ?? ??? ??? ?String[] tagsStrs = tagsStr.split(","); ?? ??? ??? ?Map<String,String> tags = new HashMap<String,String>(); ?? ??? ??? ?for (String s : tagsStrs) { ?? ??? ??? ??? ?int index = s.indexOf('='); ?? ??? ??? ??? ?if (index == -1) ?? ??? ??? ??? ??? ?continue; ?? ??? ??? ??? ?String tagK = s.substring(0,index); ?? ??? ??? ??? ?String tagV = s.substring(index + 1); ?? ??? ??? ??? ?tags.put(tagK,tagV); ?? ??? ??? ?}
?? ??? ??? ?QueryService queryService = new QueryService(); ?? ??? ??? ?try { ?? ??? ??? ??? ?List<QueryResponseEntity> responses = queryService ?? ??? ??? ??? ??? ??? ?.queryByMetric(start,tags,null,"sum");
?? ??? ??? ??? ?StringBuffer sb = new StringBuffer(); //?? ??? ??? ??? ?Map<String,String> alldps = new HashMap<String,String>();
?? ??? ??? ??? ?// build header ?? ??? ??? ??? ?Set<String> keys = new HashSet<String>(); ?? ??? ??? ??? ?sb.append("timet"); ?? ??? ??? ??? ?for (QueryResponseEntity st : responses) { ?? ??? ??? ??? ??? ?sb.append(st.getTags().toString() + "t"); ?? ??? ??? ??? ??? ?keys.addAll(st.getDps().keySet()); ?? ??? ??? ??? ?} ?? ??? ??? ??? ?sb.replace(sb.lastIndexOf("t"),sb.lastIndexOf("t") + 1,"n");
?? ??? ??? ??? ?List<String> keys2 = new ArrayList<String>(keys); ?? ??? ??? ??? ?Collections.sort(keys2); ?? ??? ??? ??? ?// build lines ?? ??? ??? ??? ?Iterator<String> it = keys2.iterator(); ?? ??? ??? ??? ? ?? ??? ??? ??? ?long t; ?? ??? ??? ??? ?while (it.hasNext()) { ?? ??? ??? ??? ??? ?String key = it.next(); // 每一行的时间戳 ?? ??? ??? ??? ??? ? ?? ??? ??? ??? ??? ?t = Long.parseLong(key); ?? ??? ??? ??? ??? ?sb.append(sdf.format(new Date(t*1000)) + "t"); ?? ??? ??? ??? ??? ?for (QueryResponseEntity st : responses) { ?? ??? ??? ??? ??? ??? ?Map<String,String> dps = st.getDps(); ?? ??? ??? ??? ??? ??? ?String value = dps.get(key); ?? ??? ??? ??? ??? ??? ?if (value != null) { ?? ??? ??? ??? ??? ??? ??? ?sb.append(value + "t"); ?? ??? ??? ??? ??? ??? ?} else { ?? ??? ??? ??? ??? ??? ??? ?sb.append(" t"); ?? ??? ??? ??? ??? ??? ?} ?? ??? ??? ??? ??? ?} ?? ??? ??? ??? ??? ?sb.replace(sb.lastIndexOf("t"), ?? ??? ??? ??? ??? ??? ??? ?"n"); ?? ??? ??? ??? ?} ?? ??? ??? ??? ?// sb.toString()
?? ??? ??? ??? ?return new InterpreterResult(InterpreterResult.Code.SUCCESS, ?? ??? ??? ??? ??? ??? ?InterpreterResult.Type.TABLE,sb.toString());
????二 插件部署
????????? ? (1) ?实现类的配置 ?
????????????????? ? 在ZEPPELIN_HOME/conf/zeppelin-site.xml
? ? ? ? ? ? ? ? ?? 
????? ? ? (2) 拷贝OpenTSDB插件包
? ? ? ? ? ? ? ?在ZEPPELIN_HOME/interpreter
? ? ? ? ? ? ? ? 创建文件夹tsd,将所有依赖包拷贝到该文件夹下
? ? ? ? ? ? ? ? ? ?? 
? ? ? ? ? (3) 重启Zeppelin,在Zeppelin管理界面的 Interpreter中添加 TSD配置
(编辑:西安站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|