How to get Kafka Consumer connection states

If you want to gather information about the interaction of Kafka Consumer with Kafka Broker, you may want to use this method:

public String readConsumerInfo(KafkaConsumer<String, String> consumer) {
    try {
        ConsumerNetworkClient consumerNetworkClient = getKafkaConsumerNetworkClient(consumer);
        KafkaClient kafkaClient = getKafkaClient(consumerNetworkClient);
        Object kafkaClientConnectionStates = getKafkaClientConnectionStates(kafkaClient);
        String result = kafkaConnectionStatesToString(kafkaClientConnectionStates);
        return String.format("----- %s description: %s", readConsumerClientId(consumer), result);
    } catch (Exception ex) {
        ex.printStackTrace();
        return null;
    }
}

Helper methods:

private ConsumerNetworkClient getKafkaConsumerNetworkClient(KafkaConsumer<String, String> consumer) throws Exception {
    Field field = consumer.getClass().getDeclaredField("client");
    field.setAccessible(true);
    return (ConsumerNetworkClient) field.get(consumer);
}

private KafkaClient getKafkaClient(ConsumerNetworkClient consumerNetworkClient) throws Exception {
    Field field = consumerNetworkClient.getClass().getDeclaredField("client");
    field.setAccessible(true);
    return (KafkaClient) field.get(consumerNetworkClient);
}

private Object getKafkaClientConnectionStates(KafkaClient kafkaClient) throws Exception {
    Field field = kafkaClient.getClass().getDeclaredField("connectionStates");
    field.setAccessible(true);
    return field.get(kafkaClient);
}

private String kafkaConnectionStatesToString(Object kafkaClusterConnectionStates) throws Exception {
    Field field = kafkaClusterConnectionStates.getClass().getDeclaredField("nodeState");
    field.setAccessible(true);
    Map<String, Object> connectionStateMap = (Map<String, Object>) field.get(kafkaClusterConnectionStates);
    StringBuilder sb = new StringBuilder();
    for (Map.Entry<String, Object> connectionState : connectionStateMap.entrySet()) {
        sb.append("\n").append("Id: ").append(connectionState.getKey()).append(", ").append(kafkaNodeConnectionStateToString(connectionState.getValue()));
    }
    return sb.toString();
}

/**
  * You may want to read more fields from KafkaNodeConnectionState class
  */
private String kafkaNodeConnectionStateToString(Object kafkaNodeConnectionState) throws Exception {
    Field stateField = kafkaNodeConnectionState.getClass().getDeclaredField("state");
    stateField.setAccessible(true);
    ConnectionState state = (ConnectionState) stateField.get(kafkaNodeConnectionState);

    Field hostField = kafkaNodeConnectionState.getClass().getDeclaredField("host");
    hostField.setAccessible(true);
    String host = (String) hostField.get(kafkaNodeConnectionState);

    Field authenticationExceptionField = kafkaNodeConnectionState.getClass().getDeclaredField("authenticationException");
    authenticationExceptionField.setAccessible(true);
    AuthenticationException authenticationException = (AuthenticationException) authenticationExceptionField.get(kafkaNodeConnectionState);

    Field failedAttemptsField = kafkaNodeConnectionState.getClass().getDeclaredField("failedAttempts");
    failedAttemptsField.setAccessible(true);
    Long failedAttempts = (Long) failedAttemptsField.get(kafkaNodeConnectionState);

    Field failedConnectAttemptsField = kafkaNodeConnectionState.getClass().getDeclaredField("failedConnectAttempts");
    failedConnectAttemptsField.setAccessible(true);
    Long failedConnectAttempts = (Long) failedConnectAttemptsField.get(kafkaNodeConnectionState);

    return String.format("state: %s, host: %s, authenticationException: %s, failedAttempts: %s, failedConnectAttempts: %s)",
            state, host, authenticationException, failedAttempts, failedConnectAttempts);
}

private String readConsumerClientId(KafkaConsumer<String, String> consumer) {
    try {
        Method method = consumer.getClass().getDeclaredMethod("getClientId");
        method.setAccessible(true);
        return (String) method.invoke(consumer);
    } catch (Exception ex) {
        ex.printStackTrace();
        return null;
    }
}
Telegram channel

If you still have any questions, feel free to ask me in the comments under this article or write me at promark33@gmail.com.

If I saved your day, you can support me 🤝

Leave a Reply

Your email address will not be published. Required fields are marked *