博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
深入理解Flink ---- Metrics的内部结构
阅读量:4563 次
发布时间:2019-06-08

本文共 13292 字,大约阅读时间需要 44 分钟。

从Metrics的使用说起

Flink的Metrics种类有四种CountersGaugesHistograms和Meters.

如何使用Metrics呢? 以Counter为例,

1 public class MyMapper extends RichMapFunction
{ 2 private transient Counter counter; 3 4 @Override 5 public void open(Configuration config) { 6 this.counter = getRuntimeContext() 7 .getMetricGroup() 8 .counter("myCounter"); 9 }10 11 @Override12 public String map(String value) throws Exception {13 this.counter.inc();14 return value;15 }16 }

行7 getMetricGroup()获取MetricGroup

行8 从MetricGroup中获取Metric实例

那么,我们来探访一下MetricGroup

Metric容器--MetricGroup

MetricGroup是Metric对象和metric subgroups的容器.

 

调用以下4个方法可以获得Metric对象并调用addMetric()注册这个Metric.

(AbstractMetricGroup.java)

1 public 
C counter(String name, C counter) 2 { 3 addMetric(name, counter); 4 return counter; 5 } 6 7 public
> G gauge(String name, G gauge) { 8 addMetric(name, gauge); 9 return gauge;10 }11 12 public
H histogram(String name, H histogram) {13 addMetric(name, histogram);14 return histogram;15 }16 17 public
M meter(String name, M meter) {18 addMetric(name, meter);19 return meter;20 }

注,MetricGroup接口的另一个实现UnregisteredMetricsGroup仅仅返回Metric实例而不对Metric进行注册

注2,MetricGroup接口的第三个实现ProxyMetricGroup有一个parent MetricGroup,ProxyMetricGroup所有的调用都转发到parentMetricGroup上

 

(AbstractMetricGroup.java)重要的域

/** The registry that this metrics group belongs to. */    protected final MetricRegistry registry;     /** All metrics that are directly contained in this group. */    private final Map
metrics = new HashMap<>(); /** All metric subgroups of this group. */ private final Map
groups = new HashMap<>();/** Flag indicating whether this group has been closed. */ private volatile boolean closed;

 

(AbstractMetricGroup.java)

1     protected void addMetric(String name, Metric metric) { 2         if (metric == null) { 3             LOG.warn("Ignoring attempted registration of a metric due to being null for name {}.", name); 4             return; 5         } 6         // add the metric only if the group is still open 7         synchronized (this) { 8             if (!closed) { 9                 // immediately put without a 'contains' check to optimize the common case (no collision)10                 // collisions are resolved later11                 Metric prior = metrics.put(name, metric);12 13                 // check for collisions with other metric names14                 if (prior == null) {15                     // no other metric with this name yet16 17                     if (groups.containsKey(name)) {18                         // we warn here, rather than failing, because metrics are tools that should not fail the19                         // program when used incorrectly20                         LOG.warn("Name collision: Adding a metric with the same name as a metric subgroup: '" +21                                 name + "'. Metric might not get properly reported. " + Arrays.toString(scopeComponents));22                     }23 24                     registry.register(metric, name, this);25                 }26                 else {27                     // we had a collision. put back the original value28                     metrics.put(name, prior);29 30                     // we warn here, rather than failing, because metrics are tools that should not fail the31                     // program when used incorrectly32                     LOG.warn("Name collision: Group already contains a Metric with the name '" +33                             name + "'. Metric will not be reported." + Arrays.toString(scopeComponents));34                 }35             }36         }37     }

具体再来看一下addMetric()的代码

行7 获得互斥锁

行8 检测当前group是否close

行11~34 把要注册的Metric对象添加到metrics map中

这里一个小trick是,默认没有key的冲突,直接把这个metric对象添加到map中.再回头检测是否有值被替换出来.这样的做法可以优化性能(若没有key冲突,减少了一次map寻址)

行24 在MetricRegister中注册Metric,这个在下一节详谈

 

调用addGroup()可以添加subgroup

(AbstractMetricGroup.java)

1     private AbstractMetricGroup
addGroup(String name, ChildType childType) { 2 synchronized (this) { 3 if (!closed) { 4 // adding a group with the same name as a metric creates problems in many reporters/dashboards 5 // we warn here, rather than failing, because metrics are tools that should not fail the 6 // program when used incorrectly 7 if (metrics.containsKey(name)) { 8 LOG.warn("Name collision: Adding a metric subgroup with the same name as an existing metric: '" + 9 name + "'. Metric might not get properly reported. " + Arrays.toString(scopeComponents));10 }11 12 AbstractMetricGroup newGroup = createChildGroup(name, childType);13 AbstractMetricGroup prior = groups.put(name, newGroup);14 if (prior == null) {15 // no prior group with that name16 return newGroup;17 } else {18 // had a prior group with that name, add the prior group back19 groups.put(name, prior);20 return prior;21 }22 }23 else {24 // return a non-registered group that is immediately closed already25 GenericMetricGroup closedGroup = new GenericMetricGroup(registry, this, name);26 closedGroup.close();27 return closedGroup;28 }29 }30 }31 32 protected GenericMetricGroup createChildGroup(String name, ChildType childType) {33 switch (childType) {34 case KEY:35 return new GenericKeyMetricGroup(registry, this, name);36 default:37 return new GenericMetricGroup(registry, this, name);38 }39 }40 41 /**42 * Enum for indicating which child group should be created.43 * `KEY` is used to create {
@link GenericKeyMetricGroup}.44 * `VALUE` is used to create {
@link GenericValueMetricGroup}.45 * `GENERIC` is used to create {
@link GenericMetricGroup}.46 */47 protected enum ChildType {48 KEY,49 VALUE,50 GENERIC51 }

行2 获取互斥锁

行12~21 新建MetricGroup对象

注意,添加的subgroup的name与Metric对象的name相同会造成问题.

行25,35,37 同一个tree里的MetricGroup对象使用同一个MetricRegister

行26 close MetricGroup

1     public void close() { 2         synchronized (this) { 3             if (!closed) { 4                 closed = true; 5  6                 // close all subgroups 7                 for (AbstractMetricGroup group : groups.values()) { 8                     group.close(); 9                 }10                 groups.clear();11 12                 // un-register all directly contained metrics13                 for (Map.Entry
metric : metrics.entrySet()) {14 registry.unregister(metric.getValue(), metric.getKey(), this);15 }16 metrics.clear();17 }18 }19 }

行2 获取互斥锁

递归地close所有subgroups, 注销所有metrics

 

MetricGroup中的addMetric(),addGroup(),close()以及上面未提到的getAllVariables()方法需要获取互斥锁

原因: 防止关闭group的同时添加metrics和subgroups造成的资源泄露.

 

MetricGroup另一个很重要的方法是public String getMetricIdentifier(String metricName, CharacterFilter filter, int reporterIndex).

作用是获取某个Metric的唯一名作为标志(identifier).

identifier分为3部分:System scope, User scope, Metric name

A.B.C   其中,A为System scope,B为User scope,C为Metric name, '.'是分隔符

System Scope可在conf/flink-conf.yaml中定义.

User Scope就是groups tree, 可调用addGroup(String)来定义 (可定义多层group)

 

MetricGroup与MetricReporter之间的桥梁 -- MetricRegister

MetricRegistry追踪所有已注册的Metric.它作为MetricGroup和MetricReporter之间的桥梁.

 

在MetricGroup的addMetric()方法中调用了MetricRegister的register()方法:

registry.register(metric, name, this);

在MetricGroup的close()方法中调用了MetricRegister的unregister()方法:

registry.unregister(metric.getValue(), metric.getKey(), this);
1     // ------------------------------------------------------------------------ 2     //  Metrics (de)registration 3     // ------------------------------------------------------------------------ 4  5     @Override 6     public void register(Metric metric, String metricName, AbstractMetricGroup group) { 7         synchronized (lock) { 8             if (isShutdown()) { 9                 LOG.warn("Cannot register metric, because the MetricRegistry has already been shut down.");10             } else {11                 if (reporters != null) {12                     for (int i = 0; i < reporters.size(); i++) {13                         MetricReporter reporter = reporters.get(i);14                         try {15                             if (reporter != null) {16                                 FrontMetricGroup front = new FrontMetricGroup
>(i, group);17 reporter.notifyOfAddedMetric(metric, metricName, front);18 }19 } catch (Exception e) {20 LOG.warn("Error while registering metric.", e);21 }22 }23 }24 try {25 if (queryService != null) {26 MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group);27 }28 } catch (Exception e) {29 LOG.warn("Error while registering metric.", e);30 }31 try {32 if (metric instanceof View) {33 if (viewUpdater == null) {34 viewUpdater = new ViewUpdater(executor);35 }36 viewUpdater.notifyOfAddedView((View) metric);37 }38 } catch (Exception e) {39 LOG.warn("Error while registering metric.", e);40 }41 }42 }43 }44 45 @Override46 public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {47 synchronized (lock) {48 if (isShutdown()) {49 LOG.warn("Cannot unregister metric, because the MetricRegistry has already been shut down.");50 } else {51 if (reporters != null) {52 for (int i = 0; i < reporters.size(); i++) {53 try {54 MetricReporter reporter = reporters.get(i);55 if (reporter != null) {56 FrontMetricGroup front = new FrontMetricGroup
>(i, group);57 reporter.notifyOfRemovedMetric(metric, metricName, front);58 }59 } catch (Exception e) {60 LOG.warn("Error while registering metric.", e);61 }62 }63 }64 try {65 if (queryService != null) {66 MetricQueryService.notifyOfRemovedMetric(queryService, metric);67 }68 } catch (Exception e) {69 LOG.warn("Error while registering metric.", e);70 }71 try {72 if (metric instanceof View) {73 if (viewUpdater != null) {74 viewUpdater.notifyOfRemovedView((View) metric);75 }76 }77 } catch (Exception e) {78 LOG.warn("Error while registering metric.", e);79 }80 }81 }82 }

register()方法和unregister()方法基本相似

行7 获取同步锁. 锁对象不再是this,而是new Object().这样做,方便拓展第二个锁.

行11~23 向所有下属的MetricReporter添加该Metric

行24~30 向MetricQueryService添加该Metric

MetricQueryService是个actor,它会将Metric序列化,然后写入到output stream

行31~40 如果Metric实现了View接口,那么在viewUpdater中注册这个Metric

Metric类实现View接口后,可以按设定时间间隔来更新这个Metric(由viewUpdater来执行update)

 

MetricReporter

MetricReporter用于把Metric导出到外部backend.

外部backend的参数可在conf/flink-conf.yaml中设定.

可同时设定多个外部backend.

 

MetricReporter接口

1 public interface MetricReporter { 2  3     // ------------------------------------------------------------------------ 4     //  life cycle 5     // ------------------------------------------------------------------------ 6  8     void open(MetricConfig config); // 9 10     void close();11 12     // ------------------------------------------------------------------------13     //  adding / removing metrics14     // ------------------------------------------------------------------------15 16     void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);17 18 19     void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);20 }

行8 配置这个Reporter.

由于reporter的构造器是无参的,这个方法用于初始化reporter的域.  

这个方法总是在对象构造后调用

行10 关闭这个Reporter.

应该在这个方法中关闭 channels,streams以及释放资源.

行16,19 增删metrics

 

常规的reporter类还需要实现Scheduled接口用于报告当前的measurements

1 public interface Scheduled {2 3     void report();4 }

行3 由metric registry定期地调用report()方法,来报告当前的measurements

 

转载于:https://www.cnblogs.com/tuowang/p/9108095.html

你可能感兴趣的文章
微信小程序picker组件 - 省市二级联动
查看>>
Dynamics CRM 给视图配置安全角色
查看>>
Eclipse修改已存在的SVN地址
查看>>
(转)使用 python Matplotlib 库绘图
查看>>
进程/线程切换原则
查看>>
正则表达式语法
查看>>
20165301 2017-2018-2 《Java程序设计》第四周学习总结
查看>>
Vue的简单入门
查看>>
urllib 中的异常处理
查看>>
通过SQL Server的扩展事件来跟踪SQL语句在运行时,时间都消耗到哪儿了?
查看>>
gulp
查看>>
pgsql查询优化之模糊查询
查看>>
不变模式
查看>>
20181227 新的目标
查看>>
androidtab
查看>>
php 事件驱动 消息机制 共享内存
查看>>
剑指offer 二叉树的bfs
查看>>
LeetCode Maximum Subarray
查看>>
让我们再聊聊浏览器资源加载优化
查看>>
underscore demo
查看>>