http://blog.csdn.net/eric_sunah/article/details/44980385?utm_source=tuicool
  
  
 
  
  
 
    
  使用JMX监控Kafka
 
  
  
  
   标签: KafkaJMX监控
   
   
   2015-04-10 15:43 
   2952人阅读 
    
   收藏 
   举报 
   
  
  
   
    
   分类: 
   
  
   Kafka(8) 
   
   
  
  
  
版权声明:本文为博主原创文章,未经博主允许不得转载。
 
  
  
  
  目录(?)[+]
 
  
  
  
  
   Kafka可以配置使用JMX进行运行状态的监控,既可以通过JDK自带Jconsole来观察结果,也可以通过Java API的方式来.
   
  
   关于监控指标的描述,可以参考:
   http://kafka.apache.org/documentation.html#monitoring 
   
  
   
   
   
   
 开启JMX端口
 
   
  
   修改bin/kafka-server-start.sh,添加JMX_PORT参数,添加后样子如下
   
   
    
     
     
[html] 
     view plain
      copy 
      
      print
     ? 
     
    
   
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then      export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"      export JMX_PORT="9999"  fi  
   
  
  
   
    通过Jconsole测试时候可以连接
 
   
   
   
   
   
   
   
  
   
   
  
   
   
   
    通过JavaAPI来访问
 
   
  
   
   
  
   通过以下方法获取目标值
   
   
    
     
     
[java] 
     view plain
      copy 
      
      print
     ?
     
      
     
    
   public class KafkaDataProvider{      protected final Logger LOGGER = LoggerFactory.getLogger(getClass());      private static final String MESSAGE_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec";      private static final String BYTES_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec";      private static final String BYTES_OUT_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec";      private static final String PRODUCE_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce";      private static final String CONSUMER_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer";      private static final String FLOWER_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchFollower";      private static final String ACTIVE_CONTROLLER_COUNT = "kafka.controller:type=KafkaController,name=ActiveControllerCount";      private static final String PART_COUNT = "kafka.server:type=ReplicaManager,name=PartitionCount";      public String extractMonitorData() {                    KafkaRoleInfo monitorDataPoint = new KafkaRoleInfo();          String jmxURL = "service:jmx:rmi:///jndi/rmi://192.168.40.242:9999/jmxrmi";          try {              MBeanServerConnection jmxConnection = MetricDataUtils.getMBeanServerConnection(jmxURL);              ObjectName messageCountObj = new ObjectName(MESSAGE_IN_PER_SEC);              ObjectName bytesInPerSecObj = new ObjectName(BYTES_IN_PER_SEC);              ObjectName bytesOutPerSecObj = new ObjectName(BYTES_OUT_PER_SEC);              ObjectName produceRequestsPerSecObj = new ObjectName(PRODUCE_REQUEST_PER_SEC);              ObjectName consumerRequestsPerSecObj = new ObjectName(CONSUMER_REQUEST_PER_SEC);              ObjectName flowerRequestsPerSecObj = new ObjectName(FLOWER_REQUEST_PER_SEC);              ObjectName activeControllerCountObj = new ObjectName(ACTIVE_CONTROLLER_COUNT);              ObjectName partCountObj = new ObjectName(PART_COUNT);              Long messagesInPerSec = (Long) jmxConnection.getAttribute(messageCountObj, "Count");              Long bytesInPerSec = (Long) jmxConnection.getAttribute(bytesInPerSecObj, "Count");              Long bytesOutPerSec = (Long) jmxConnection.getAttribute(bytesOutPerSecObj, "Count");              Long produceRequestCountPerSec = (Long) jmxConnection.getAttribute(produceRequestsPerSecObj, "Count");              Long consumerRequestCountPerSec = (Long) jmxConnection.getAttribute(consumerRequestsPerSecObj, "Count");              Long flowerRequestCountPerSec = (Long) jmxConnection.getAttribute(flowerRequestsPerSecObj, "Count");              Integer activeControllerCount = (Integer) jmxConnection.getAttribute(activeControllerCountObj, "Value");              Integer partCount = (Integer) jmxConnection.getAttribute(partCountObj, "Value");              monitorDataPoint.setMessagesInPerSec(messagesInPerSec);              monitorDataPoint.setBytesInPerSec(bytesInPerSec);              monitorDataPoint.setBytesOutPerSec(bytesOutPerSec);              monitorDataPoint.setProduceRequestCountPerSec(produceRequestCountPerSec);              monitorDataPoint.setConsumerRequestCountPerSec(consumerRequestCountPerSec);              monitorDataPoint.setFlowerRequestCountPerSec(flowerRequestCountPerSec);              monitorDataPoint.setActiveControllerCount(activeControllerCount);              monitorDataPoint.setPartCount(partCount);          } catch (IOException e) {              e.printStackTrace();          } catch (MalformedObjectNameException e) {              e.printStackTrace();          } catch (AttributeNotFoundException e) {              e.printStackTrace();          } catch (MBeanException e) {              e.printStackTrace();          } catch (ReflectionException e) {              e.printStackTrace();          } catch (InstanceNotFoundException e) {              e.printStackTrace();          }          return monitorDataPoint.toString();      }      public static void main(String[] args) {          System.out.println(new KafkaDataProvider().extractMonitorData());      }                  public MBeanServerConnection getMBeanServerConnection(String jmxUrl) throws IOException {          JMXServiceURL url = new JMXServiceURL(jmxUrl);          JMXConnector jmxc = JMXConnectorFactory.connect(url, null);          MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();          return mbsc;      }  }  
   
  
   其他工具 
 
  除了自己编写定制化的监控程序外
 
   
   
    kafka-web-console
 https://github.com/claudemamo/kafka-web-console
    部署sbt:
    http://www.scala-sbt.org/0.13/tutorial/Manual-Installation.html
    http://www.scala-sbt.org/release/tutorial/zh-cn/Installing-sbt-on-Linux.html
   
   
   
 KafkaOffsetMonitor
 https://github.com/quantifind/KafkaOffsetMonitor/releases/tag/v0.2.0
    java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk localhost:12181 --port 8080 --refresh 5.minutes --retain 1.day
   
   
   
 Mx4jLoader
 
   
  
                
        
 
相关资源:zabbix监控之kafka模板