从Metrics的使用说起
Flink的Metrics种类有四种Counters
, Gauges
, Histograms和
Meters
.
如何使用Metrics呢? 以Counter为例,
1 public class MyMapper extends RichMapFunction{ 2 private transient Counter counter; 3 4 @Override 5 public void open(Configuration config) { 6 this.counter = getRuntimeContext() 7 .getMetricGroup() 8 .counter("myCounter"); 9 }10 11 @Override12 public String map(String value) throws Exception {13 this.counter.inc();14 return value;15 }16 }
行7 getMetricGroup()获取MetricGroup
行8 从MetricGroup中获取Metric实例
那么,我们来探访一下MetricGroup
Metric容器--MetricGroup
MetricGroup是Metric对象和metric subgroups的容器.
调用以下4个方法可以获得Metric对象并调用addMetric()注册这个Metric.
(AbstractMetricGroup.java)
1 publicC counter(String name, C counter) 2 { 3 addMetric(name, counter); 4 return counter; 5 } 6 7 public > G gauge(String name, G gauge) { 8 addMetric(name, gauge); 9 return gauge;10 }11 12 public H histogram(String name, H histogram) {13 addMetric(name, histogram);14 return histogram;15 }16 17 public M meter(String name, M meter) {18 addMetric(name, meter);19 return meter;20 }
注,MetricGroup接口的另一个实现UnregisteredMetricsGroup仅仅返回Metric实例而不对Metric进行注册
注2,MetricGroup接口的第三个实现ProxyMetricGroup有一个parent MetricGroup,ProxyMetricGroup所有的调用都转发到parentMetricGroup上
(AbstractMetricGroup.java)重要的域
/** The registry that this metrics group belongs to. */ protected final MetricRegistry registry; /** All metrics that are directly contained in this group. */ private final Mapmetrics = new HashMap<>(); /** All metric subgroups of this group. */ private final Map groups = new HashMap<>();/** Flag indicating whether this group has been closed. */ private volatile boolean closed;
(AbstractMetricGroup.java)
1 protected void addMetric(String name, Metric metric) { 2 if (metric == null) { 3 LOG.warn("Ignoring attempted registration of a metric due to being null for name {}.", name); 4 return; 5 } 6 // add the metric only if the group is still open 7 synchronized (this) { 8 if (!closed) { 9 // immediately put without a 'contains' check to optimize the common case (no collision)10 // collisions are resolved later11 Metric prior = metrics.put(name, metric);12 13 // check for collisions with other metric names14 if (prior == null) {15 // no other metric with this name yet16 17 if (groups.containsKey(name)) {18 // we warn here, rather than failing, because metrics are tools that should not fail the19 // program when used incorrectly20 LOG.warn("Name collision: Adding a metric with the same name as a metric subgroup: '" +21 name + "'. Metric might not get properly reported. " + Arrays.toString(scopeComponents));22 }23 24 registry.register(metric, name, this);25 }26 else {27 // we had a collision. put back the original value28 metrics.put(name, prior);29 30 // we warn here, rather than failing, because metrics are tools that should not fail the31 // program when used incorrectly32 LOG.warn("Name collision: Group already contains a Metric with the name '" +33 name + "'. Metric will not be reported." + Arrays.toString(scopeComponents));34 }35 }36 }37 }
具体再来看一下addMetric()的代码
行7 获得互斥锁
行8 检测当前group是否close
行11~34 把要注册的Metric对象添加到metrics map中
这里一个小trick是,默认没有key的冲突,直接把这个metric对象添加到map中.再回头检测是否有值被替换出来.这样的做法可以优化性能(若没有key冲突,减少了一次map寻址)
行24 在MetricRegister中注册Metric,这个在下一节详谈
调用addGroup()可以添加subgroup
(AbstractMetricGroup.java)
1 private AbstractMetricGroup addGroup(String name, ChildType childType) { 2 synchronized (this) { 3 if (!closed) { 4 // adding a group with the same name as a metric creates problems in many reporters/dashboards 5 // we warn here, rather than failing, because metrics are tools that should not fail the 6 // program when used incorrectly 7 if (metrics.containsKey(name)) { 8 LOG.warn("Name collision: Adding a metric subgroup with the same name as an existing metric: '" + 9 name + "'. Metric might not get properly reported. " + Arrays.toString(scopeComponents));10 }11 12 AbstractMetricGroup newGroup = createChildGroup(name, childType);13 AbstractMetricGroup prior = groups.put(name, newGroup);14 if (prior == null) {15 // no prior group with that name16 return newGroup;17 } else {18 // had a prior group with that name, add the prior group back19 groups.put(name, prior);20 return prior;21 }22 }23 else {24 // return a non-registered group that is immediately closed already25 GenericMetricGroup closedGroup = new GenericMetricGroup(registry, this, name);26 closedGroup.close();27 return closedGroup;28 }29 }30 }31 32 protected GenericMetricGroup createChildGroup(String name, ChildType childType) {33 switch (childType) {34 case KEY:35 return new GenericKeyMetricGroup(registry, this, name);36 default:37 return new GenericMetricGroup(registry, this, name);38 }39 }40 41 /**42 * Enum for indicating which child group should be created.43 * `KEY` is used to create { @link GenericKeyMetricGroup}.44 * `VALUE` is used to create { @link GenericValueMetricGroup}.45 * `GENERIC` is used to create { @link GenericMetricGroup}.46 */47 protected enum ChildType {48 KEY,49 VALUE,50 GENERIC51 }
行2 获取互斥锁
行12~21 新建MetricGroup对象
注意,添加的subgroup的name与Metric对象的name相同会造成问题.
行25,35,37 同一个tree里的MetricGroup对象使用同一个MetricRegister
行26 close MetricGroup
1 public void close() { 2 synchronized (this) { 3 if (!closed) { 4 closed = true; 5 6 // close all subgroups 7 for (AbstractMetricGroup group : groups.values()) { 8 group.close(); 9 }10 groups.clear();11 12 // un-register all directly contained metrics13 for (Map.Entrymetric : metrics.entrySet()) {14 registry.unregister(metric.getValue(), metric.getKey(), this);15 }16 metrics.clear();17 }18 }19 }
行2 获取互斥锁
递归地close所有subgroups, 注销所有metrics
MetricGroup中的addMetric(),addGroup(),close()以及上面未提到的getAllVariables()方法需要获取互斥锁
原因: 防止关闭group的同时添加metrics和subgroups造成的资源泄露.
MetricGroup另一个很重要的方法是public String getMetricIdentifier(String metricName, CharacterFilter filter, int reporterIndex).
作用是获取某个Metric的唯一名作为标志(identifier).
identifier分为3部分:System scope, User scope, Metric name
A.B.C 其中,A为System scope,B为User scope,C为Metric name, '.'是分隔符
System Scope可在conf/flink-conf.yaml中定义.
User Scope就是groups tree, 可调用addGroup(String)来定义 (可定义多层group)
MetricGroup与MetricReporter之间的桥梁 -- MetricRegister
MetricRegistry追踪所有已注册的Metric.它作为MetricGroup和MetricReporter之间的桥梁.
在MetricGroup的addMetric()方法中调用了MetricRegister的register()方法:
registry.register(metric, name, this);
在MetricGroup的close()方法中调用了MetricRegister的unregister()方法:
registry.unregister(metric.getValue(), metric.getKey(), this);
1 // ------------------------------------------------------------------------ 2 // Metrics (de)registration 3 // ------------------------------------------------------------------------ 4 5 @Override 6 public void register(Metric metric, String metricName, AbstractMetricGroup group) { 7 synchronized (lock) { 8 if (isShutdown()) { 9 LOG.warn("Cannot register metric, because the MetricRegistry has already been shut down.");10 } else {11 if (reporters != null) {12 for (int i = 0; i < reporters.size(); i++) {13 MetricReporter reporter = reporters.get(i);14 try {15 if (reporter != null) {16 FrontMetricGroup front = new FrontMetricGroup>(i, group);17 reporter.notifyOfAddedMetric(metric, metricName, front);18 }19 } catch (Exception e) {20 LOG.warn("Error while registering metric.", e);21 }22 }23 }24 try {25 if (queryService != null) {26 MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group);27 }28 } catch (Exception e) {29 LOG.warn("Error while registering metric.", e);30 }31 try {32 if (metric instanceof View) {33 if (viewUpdater == null) {34 viewUpdater = new ViewUpdater(executor);35 }36 viewUpdater.notifyOfAddedView((View) metric);37 }38 } catch (Exception e) {39 LOG.warn("Error while registering metric.", e);40 }41 }42 }43 }44 45 @Override46 public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {47 synchronized (lock) {48 if (isShutdown()) {49 LOG.warn("Cannot unregister metric, because the MetricRegistry has already been shut down.");50 } else {51 if (reporters != null) {52 for (int i = 0; i < reporters.size(); i++) {53 try {54 MetricReporter reporter = reporters.get(i);55 if (reporter != null) {56 FrontMetricGroup front = new FrontMetricGroup >(i, group);57 reporter.notifyOfRemovedMetric(metric, metricName, front);58 }59 } catch (Exception e) {60 LOG.warn("Error while registering metric.", e);61 }62 }63 }64 try {65 if (queryService != null) {66 MetricQueryService.notifyOfRemovedMetric(queryService, metric);67 }68 } catch (Exception e) {69 LOG.warn("Error while registering metric.", e);70 }71 try {72 if (metric instanceof View) {73 if (viewUpdater != null) {74 viewUpdater.notifyOfRemovedView((View) metric);75 }76 }77 } catch (Exception e) {78 LOG.warn("Error while registering metric.", e);79 }80 }81 }82 }
register()方法和unregister()方法基本相似
行7 获取同步锁. 锁对象不再是this,而是new Object().这样做,方便拓展第二个锁.
行11~23 向所有下属的MetricReporter添加该Metric
行24~30 向MetricQueryService添加该Metric
MetricQueryService是个actor,它会将Metric序列化,然后写入到output stream
行31~40 如果Metric实现了View接口,那么在viewUpdater中注册这个Metric
Metric类实现View接口后,可以按设定时间间隔来更新这个Metric(由viewUpdater来执行update)
MetricReporter
MetricReporter用于把Metric导出到外部backend.
外部backend的参数可在conf/flink-conf.yaml中设定.
可同时设定多个外部backend.
MetricReporter接口
1 public interface MetricReporter { 2 3 // ------------------------------------------------------------------------ 4 // life cycle 5 // ------------------------------------------------------------------------ 6 8 void open(MetricConfig config); // 9 10 void close();11 12 // ------------------------------------------------------------------------13 // adding / removing metrics14 // ------------------------------------------------------------------------15 16 void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);17 18 19 void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);20 }
行8 配置这个Reporter.
由于reporter的构造器是无参的,这个方法用于初始化reporter的域.
这个方法总是在对象构造后调用
行10 关闭这个Reporter.
应该在这个方法中关闭 channels,streams以及释放资源.
行16,19 增删metrics
常规的reporter类还需要实现Scheduled接口用于报告当前的measurements
1 public interface Scheduled {2 3 void report();4 }
行3 由metric registry定期地调用report()方法,来报告当前的measurements