Ceilometer 源码学习 - Notification Agent

简介

Ceilometer有两种数据收集方式,Ceilometer 源码学习 - Polling Agent中提到了主动调用api的Polling方式。显而易见的,这种方式会增加其他组件的负担。所以更优雅也是更推荐的方式是由Notification Agent监听消息队列并收集需要的数据。 这篇文章就将介绍Notification Agent的功能和实现。

需求导向

一句话来概括Notification Agent的功能:

监听消息队列上其他Openstack组件产生的通知数据,加工处理后将数据发送出来

结构图如下: 图1 Notification Agent结构图

可以看出Notification Agent中需要完成下面这些事情:

  1. 监听消息队列,收集数据;
  2. 对原始数据进行加工处理;
  3. 将处理后的数据发送出去。

上述功能并不复杂,但对于处理什么数据、如何加工处理、发送到哪里的需求大不相同,相互结合起来就有了众多的组合。为了使得整个Notification Agent更灵活更方便扩展。Ceilometer中采用了Pipeline的方式,简单的概括就是:

得益于pipeline的处理方式,每一步数据处理都可以以插件的形式存在,自由的组合,并且不需要关心其他插件的所作所为。这些相互独立功能单一的插件可以划分为三个类型,分别对应上述提到的三个需求:

  1. Notification插件:监听消息队列上的某种通知数据;
  2. Tranformer插件:将获得的通知数据,按pipeline定义做某种转化处理,这些处理可能是聚合,可能是形式转化,看一下官方的示意图: 图2 Tranform示意图
  3. Publisher插件:将Tranformer处理后的数据发送到pipeline定义的地方。

这些插件按对应的namespace定义在setpy.cfg配置文件中,并通过stevedore在运行时动态加载。

代码细节

了解了Notification Agent的整体结构和实现方式后,接下来从代码层面介绍其实现细节。

1. 入口

console_scripts =
   		ceilometer-agent-notification = ceilometer.cmd.eventlet.agent_notification:main
       ...

2. ceilometer.cmd.eventlet.agent_notification

相应的,在ceilometer/cmd/eventlet/agent_notification.py 文件中找到该启动函数,如下:

def main():
     service.prepare_service()
     os_service.launch(CONF, notification.NotificationService(),
                       workers=CONF.notification.workers).wait()

3. NotificationService 启动

ceilometer/notification.py 下找到NotificationService的启动部分代码,其中核心部分如下所示:

def start(self):

    # pipeline及tranformer,publisher相关插件加载
    self.pipeline_manager = pipeline.setup_pipeline()
    ... 

    # 设定并启动需要的消息队列listener
    self._configure_main_queue_listeners(self.pipe_manager,
                                         self.event_pipe_manager)

    ...

Notification Agent的主要工作,在上述两行代码中实现:

接下来分别进行介绍。

4. pipeline的加载

从pipeline.setup_pipeline()入手,通过代码的调用逻辑,一路找到了pipeline文件的加载代码,在PipelineManager的__init__方法中。对应一个pipeline.yaml示例来了解整个加载过程:

//pipeline文件示例

- name: disk_source
  interval: 600
  meters:
      - "disk.read.bytes"
      - "disk.read.requests"
      - "disk.write.bytes"
      - "disk.write.requests"
      - "disk.device.read.bytes"
      - "disk.device.read.requests"
      - "disk.device.write.bytes"
      - "disk.device.write.requests"
  sinks:
      - disk_sink
- name: network_source
  interval: 600
  meters:
      - "network.incoming.bytes"
      - "network.incoming.packets"
      - "network.outgoing.bytes"
      - "network.outgoing.packets"
  sinks:
      - network_sink

- name: disk_sink
  transformers:
      - name: "rate_of_change"
        parameters:
            source:
                map_from:
                    name: "(disk\\.device|disk)\\.(read|write)\\.(bytes|requests)"
                    unit: "(B|request)"
            target:
                map_to:
                    name: "\\1.\\2.\\3.rate"
                    unit: "\\1/s"
                type: "gauge"
  publishers:
      - notifier://
- name: network_sink
  transformers:
		...     
  publishers:
      - notifier://

可以看出,pipe文件有两部分组成:

class PipelineManager(object):
    def __init__(self, cfg, transformer_manager, p_type=SAMPLE_TYPE):
        self.pipelines = []
        ...
        # 解析sources,封装为SampleSource
        unique_names = set()
        sources = []
        for s in cfg.get('sources', []):
            name = s.get('name')
            if name in unique_names:
                raise PipelineException("Duplicated source names: %s" %
                                        name, self)
            else:
                unique_names.add(name)
                sources.append(p_type['source'](s))
        unique_names.clear()
		  
		# 解析sink,封装为SampleSink
        sinks = {}
        for s in cfg.get('sinks', []):
            name = s.get('name')
            if name in unique_names:
                raise PipelineException("Duplicated sink names: %s" %
                                        name, self)
            else:
                unique_names.add(name)
                sinks[s['name']] = p_type['sink'](s, transformer_manager)
        unique_names.clear()
		
		# 将加载的SampleSource和SampleSink封装成SamplePipeline
        for source in sources:
            source.check_sinks(sinks)
            for target in source.sinks:
                pipe = p_type['pipeline'](source, sinks[target])
                if pipe.name in unique_names:
                    raise PipelineException(
                        "Duplicate pipeline name: %s. Ensure pipeline"
                        " names are unique. (name is the source and sink"
                        " names combined)" % pipe.name, cfg)
                else:
                    unique_names.add(pipe.name)
                    self.pipelines.append(pipe)
        unique_names.clear()

上述PipelineManager的初始化函数中,做了以下操作:

具体内容可以在Source, Sink, Pipeline的初始化函数中找到。

5. 监听消息队列

回到NotificationService,看_configure_main_queue_listeners函数中如何加载并监听消息队列。

def _configure_main_queue_listeners(self, pipe_manager,
                                    event_pipe_manager):
    # 加载Notification插件
    notification_manager = self._get_notifications_manager(pipe_manager)
    if not list(notification_manager):
        LOG.warning(_('Failed to load any notification handlers for %s'),
                    self.NOTIFICATION_NAMESPACE)
	...

	# 调用Notification插件的get_targets函数获得所以要监听的target
    endpoints = []
    targets = []
    for ext in notification_manager:
        handler = ext.obj
        if (cfg.CONF.notification.disable_non_metric_meters and
                isinstance(handler, base.NonMetricNotificationBase)):
            continue
        
        for new_tar in handler.get_targets(cfg.CONF):
            if new_tar not in targets:
                targets.append(new_tar)
    # 注册listener并启动
    urls = cfg.CONF.notification.messaging_urls or [None]
    for url in urls:
        transport = messaging.get_transport(url)
        listener = messaging.get_notification_listener(
            transport, targets, endpoints)
        listener.start()

更多内容见:oslo.message Notification Listener

6.Notification插件

完成上述的加载和启动后,整个Notification Agent就有条不紊的运行起来了,接下来我们看看各插件中的运行过程,首先来看Notification插件,该插件会监听消息队列上的某种通知。以插件Instance为例,该插件用来收集消息队列中的nova instance状态信息,其在setup.cfg中的注册信息如下:

ceilometer.notification =
     instance = ceilometer.compute.notifications.instance:Instance

其处理过程如下:

最后调用Transformer和Publisher相关函数的代码如下:

def _transform_sample(self, start, ctxt, sample):
    try:
        for transformer in self.transformers[start:]:
            sample = transformer.handle_sample(ctxt, sample)
            if not sample:
                return
        return sample
    except Exception as err:
        LOG.warning(...)
        LOG.exception(err)

def _publish_samples(self, start, ctxt, samples):

    # 调用Transformer的handle_sample处理sample
    transformed_samples = []
    if not self.transformers:
        transformed_samples = samples
    else:
        for sample in samples:
            sample = self._transform_sample(start, ctxt, sample)
            if sample:
                transformed_samples.append(sample)
    # 调用Publisher的publish_samples发送对应sample
    if transformed_samples:
        for p in self.publishers:
            try:
                p.publish_samples(ctxt, transformed_samples)
            except Exception:
                LOG.exception(...)

7.Transformer插件

上面的介绍中有两个地方提到了负责数据格式转化的Transformer插件:

下面看一下RateOfChangeTransformer,其在setup.cfg中的注册:

ceilometer.transformer =
	rate_of_change = ceilometer.transformer.conversions:RateOfChangeTransformer

ceilometer/transformer/conversions.py中可以看到其代码实现如下:

def handle_sample(self, context, s):
    """Handle a sample, converting if necessary."""
    key = s.name + s.resource_id
    prev = self.cache.get(key)
    timestamp = timeutils.parse_isotime(s.timestamp)
    self.cache[key] = (s.volume, timestamp)

    if prev:
        prev_volume = prev[0]
        prev_timestamp = prev[1]
        time_delta = timeutils.delta_seconds(prev_timestamp, timestamp)
        if time_delta < 0:
            LOG.warn(_('dropping out of time order sample: %s'), (s,))
            # Reset the cache to the newer sample.
            self.cache[key] = prev
            return None
        volume_delta = (s.volume - prev_volume
                        if (prev_volume <= s.volume or
                            s.type != sample.TYPE_CUMULATIVE)
                        else s.volume)
        rate_of_change = ((1.0 * volume_delta / time_delta)
                          if time_delta else 0.0)

        s = self._convert(s, rate_of_change)
    else:
        LOG.warn(_('dropping sample with no predecessor: %s'),
                 (s,))
        s = None
    return s

8.Publisher插件

对于Publisher插件,同样地,在加载pipeline文件和发送数据时提到过。以向消息队列输出数据的SampleNotifierPublisher为例,pipeline中定义 “publishers: - notifier://“ 的sink会最终由该插件发送数据。其在配置文件中的注册信息如下:

ceilometer.publisher =
	notifier = ceilometer.publisher.messaging:SampleNotifierPublisher

ceilometer/transformer/conversions.py中可以看到其代码实现如下:

class NotifierPublisher(MessagingPublisher):
    def __init__(self, parsed_url, default_topic):
        super(NotifierPublisher, self).__init__(parsed_url)
        options = urlparse.parse_qs(parsed_url.query)
        topic = options.get('topic', [default_topic])[-1]
        self.notifier = oslo_messaging.Notifier(
            messaging.get_transport(),
            driver=cfg.CONF.publisher_notifier.telemetry_driver,
            publisher_id='telemetry.publisher.%s' % cfg.CONF.host,
            topic=topic,
            retry=self.retry
        )

    def _send(self, context, event_type, data):
        try:
            self.notifier.sample(context.to_dict(), event_type=event_type,
                                 payload=data)
        except oslo_messaging.MessageDeliveryFailure as e:
            raise_delivery_failure(e)


class SampleNotifierPublisher(NotifierPublisher):
    def __init__(self, parsed_url):
        super(SampleNotifierPublisher, self).__init__(
            parsed_url, cfg.CONF.publisher_notifier.metering_topic) # metering_topic 默认值为metering

9.Event 数据

需要指出的是,Notification Agent可以用两种方式处理监听到的数据,上面提到的是Sample方式,适用于处理数值数据。与之对应的Event方式则适合处理事件数据,如磁盘创建,实例删除。

参考

官方文档:Ceilometer Architecture Github:Ceilometer Source Code

Table of Contents