Each Heron topology has its own centralized Metrics Manager (MM), which collects metrics from all instances in the topology. You can define how the MM processes metrics by implementing a metrics sink, which specifies how the MM handles incoming MetricsRecord objects.

Java is currently the only supported language for custom metrics sinks. This may change in the future.

Currently-supported Sinks

Heron comes equipped out of the box with three metrics sinks that you can apply for a specific topology. The code for these sinks may prove helpful for implementing your own.

  • GraphiteSink — Sends each MetricsRecord object to a Graphite instance according to a Graphite prefix.
  • ScribeSink — Sends each MetricsRecord object to a Scribe instance according to a Scribe category and namespace.
  • FileSink — Writes each MetricsRecord object to a JSON file at a specified path.

More on using those sinks in a Heron cluster can be found in Metrics Manager.

Java Setup

In order to create a custom metrics sink, you need to import the heron-spi library into your project.

Maven

com.twitter.heron heron-spi 0.14.0

Gradle

dependencies { compile group: "com.twitter.heron", name: "heron-spi", version: "0.14.0" }

The IMetricsSink Interface

Each metrics sink must implement the IMetricsSink interface, which requires you to implement the following methods:

  • void init(Map<String, Object> conf, SinkContext context) — Defines the initialization behavior of the sink. The conf map is the configuration that is passed to the sink by the .yaml configuration file at heron/config/metrics_sink.yaml; the SinkContext object enables you to access values from the sink’s runtime context (the ID of the metrics manager, the ID of the sink, and the name of the topology).
  • void processRecord(MetricsRecord record) — Defines how each MetricsRecord that passes through the sink is processed.
  • void flush() — Flush any buffered metrics; this function is called at the interval specified by the flush-frequency-ms. More info can be found in the Stream Manager document.
  • void close() — Closes the stream and releases any system resources associated with it; if the stream is already closed, invoking close() has no effect.

Your implementation of those interfaces will need to be packaged into a JAR file and distributed to the metrics-mgr-classpath folder of your Heron release.

Example Implementation

Below is an example implementation that simply prints the contents of each metrics record as it passes through:

import com.twitter.heron.metricsmgr.api.metrics.MetricsInfo; import com.twitter.heron.metricsmgr.api.metrics.MetricsRecord; import com.twitter.heron.metricsmgr.api.sink.IMetricsSink; import com.twitter.heron.metricsmgr.api.sink.SinkContext; public class PrintSink implements IMetricsSink { @Override public void init(Map conf, SinkContext context) { System.out.println("Sink configuration:"); // This will print out each config in the supplied configuration for (Map.Entry config : conf.entrySet()) { System.out.println(String.format("%s: %s", config.getKey(), config.getValue()); } System.out.println(String.format("Topology name: %s", context.getTopologyName()); System.out.println(String.format("Sink ID: %s", context.getSinkId())); } @Override public void processRecord(MetricsRecord record) { String recordString = String.format("Record received: %s", record.toString()); System.out.println(recordString); } @Override public void flush() { // Since we're just printing to stdout in this sink, we don't need to // specify any flush() behavior } @Override public void close() { // Since we're just printing to stdout in this sink, we don't need to // specify any close() behavior } }

Configuring Your Custom Sink

The configuration for your sink needs to be provided in the YAML file at heron/config/src/yaml/conf/${CLUSTER}/metrics_sinks.yaml.

At the top of that file there’s a sinks parameter that lists each available sink by name. You should add your sink to that list. Here’s an example:

sinks: - file-sink - scribe-sink - tmaster-sink - print-sink

For each sink you are required to specify the followings:

  • class — The Java class name of your custom implementation of the IMetricsSink interface, e.g. biz.acme.heron.metrics.PrintSink.
  • flush-frequency-ms — The frequency (in milliseconds) at which the flush() method is called in your implementation of IMetricsSink.
  • sink-restart-attempts — The number of times that a sink will attempt to restart if it throws exceptions and dies. If you do not set this, the default is 0; if you set it to -1, the sink will attempt to restart forever.

Below is an example metrics_sink.yaml configuration:

sinks: - print-sink print-sink: class: "biz.acme.heron.metrics.PrintSink" flush-frequency-ms: 60000 # One minute sink-restart-attempts: -1 # Attempt to restart forever

It is optional to add other configurations for the sink. All configurations will be constructed as an unmodifiable map Map<String, Object> conf and passed to init(conf, context).

Using Your Custom Sink

Once you’ve made a JAR for your custom Java sink, distributed that JAR to metrics-mgr-classpath folder, and changed the configuration in heron/config/src/yaml/conf/${CLUSTER}/metrics_sinks.yaml. Any topology submitted using that configuration will include the custom sink.You must re-compile Heron if you want to include the configuration in a new heron-cli distribution.