If you want to gather information about the interaction of Kafka Consumer with Kafka Broker, you may want to use this method:
public String readConsumerInfo(KafkaConsumer<String, String> consumer) {
try {
ConsumerNetworkClient consumerNetworkClient = getKafkaConsumerNetworkClient(consumer);
KafkaClient kafkaClient = getKafkaClient(consumerNetworkClient);
Object kafkaClientConnectionStates = getKafkaClientConnectionStates(kafkaClient);
String result = kafkaConnectionStatesToString(kafkaClientConnectionStates);
return String.format("----- %s description: %s", readConsumerClientId(consumer), result);
} catch (Exception ex) {
ex.printStackTrace();
return null;
}
}
Helper methods:
private ConsumerNetworkClient getKafkaConsumerNetworkClient(KafkaConsumer<String, String> consumer) throws Exception {
Field field = consumer.getClass().getDeclaredField("client");
field.setAccessible(true);
return (ConsumerNetworkClient) field.get(consumer);
}
private KafkaClient getKafkaClient(ConsumerNetworkClient consumerNetworkClient) throws Exception {
Field field = consumerNetworkClient.getClass().getDeclaredField("client");
field.setAccessible(true);
return (KafkaClient) field.get(consumerNetworkClient);
}
private Object getKafkaClientConnectionStates(KafkaClient kafkaClient) throws Exception {
Field field = kafkaClient.getClass().getDeclaredField("connectionStates");
field.setAccessible(true);
return field.get(kafkaClient);
}
private String kafkaConnectionStatesToString(Object kafkaClusterConnectionStates) throws Exception {
Field field = kafkaClusterConnectionStates.getClass().getDeclaredField("nodeState");
field.setAccessible(true);
Map<String, Object> connectionStateMap = (Map<String, Object>) field.get(kafkaClusterConnectionStates);
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, Object> connectionState : connectionStateMap.entrySet()) {
sb.append("\n").append("Id: ").append(connectionState.getKey()).append(", ").append(kafkaNodeConnectionStateToString(connectionState.getValue()));
}
return sb.toString();
}
/**
* You may want to read more fields from KafkaNodeConnectionState class
*/
private String kafkaNodeConnectionStateToString(Object kafkaNodeConnectionState) throws Exception {
Field stateField = kafkaNodeConnectionState.getClass().getDeclaredField("state");
stateField.setAccessible(true);
ConnectionState state = (ConnectionState) stateField.get(kafkaNodeConnectionState);
Field hostField = kafkaNodeConnectionState.getClass().getDeclaredField("host");
hostField.setAccessible(true);
String host = (String) hostField.get(kafkaNodeConnectionState);
Field authenticationExceptionField = kafkaNodeConnectionState.getClass().getDeclaredField("authenticationException");
authenticationExceptionField.setAccessible(true);
AuthenticationException authenticationException = (AuthenticationException) authenticationExceptionField.get(kafkaNodeConnectionState);
Field failedAttemptsField = kafkaNodeConnectionState.getClass().getDeclaredField("failedAttempts");
failedAttemptsField.setAccessible(true);
Long failedAttempts = (Long) failedAttemptsField.get(kafkaNodeConnectionState);
Field failedConnectAttemptsField = kafkaNodeConnectionState.getClass().getDeclaredField("failedConnectAttempts");
failedConnectAttemptsField.setAccessible(true);
Long failedConnectAttempts = (Long) failedConnectAttemptsField.get(kafkaNodeConnectionState);
return String.format("state: %s, host: %s, authenticationException: %s, failedAttempts: %s, failedConnectAttempts: %s)",
state, host, authenticationException, failedAttempts, failedConnectAttempts);
}
private String readConsumerClientId(KafkaConsumer<String, String> consumer) {
try {
Method method = consumer.getClass().getDeclaredMethod("getClientId");
method.setAccessible(true);
return (String) method.invoke(consumer);
} catch (Exception ex) {
ex.printStackTrace();
return null;
}
}
If you still have any questions, feel free to ask me in the comments under this article, or write me on promark33@gmail.com.