get(@RequestParam String path){
+ return zookeeperUtils.getNodeData(path);
}
@GetMapping("/connstate")
diff --git a/src/main/java/org/gnuhpc/bigdata/exception/CollectorException.java b/src/main/java/org/gnuhpc/bigdata/exception/CollectorException.java
new file mode 100644
index 0000000..b68974f
--- /dev/null
+++ b/src/main/java/org/gnuhpc/bigdata/exception/CollectorException.java
@@ -0,0 +1,33 @@
+package org.gnuhpc.bigdata.exception;
+
+public class CollectorException extends Exception {
+ public CollectorException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public CollectorException(String message) {
+ super(message);
+ }
+
+ public CollectorException(Throwable cause) {
+ super(cause);
+ }
+
+ public CollectorException() {
+ super();
+ }
+
+ public String catchStackTrace() {
+ String stackTraceString = "";
+ StackTraceElement[] stackElements = this.getStackTrace();
+ if (stackElements != null) {
+ for (int i = 0; i < stackElements.length; i++) {
+ stackTraceString = stackTraceString + stackElements[i].getClassName()+"\\/t";
+ stackTraceString = stackTraceString + stackElements[i].getFileName()+"\\/t";
+ stackTraceString = stackTraceString + stackElements[i].getLineNumber()+"\\/t";
+ stackTraceString = stackTraceString + stackElements[i].getMethodName()+"\\/t";
+ }
+ }
+ return stackTraceString;
+ }
+}
diff --git a/src/main/java/org/gnuhpc/bigdata/exception/GlobalExceptionHandler.java b/src/main/java/org/gnuhpc/bigdata/exception/GlobalExceptionHandler.java
new file mode 100644
index 0000000..b9f06c8
--- /dev/null
+++ b/src/main/java/org/gnuhpc/bigdata/exception/GlobalExceptionHandler.java
@@ -0,0 +1,342 @@
+package org.gnuhpc.bigdata.exception;
+
+import lombok.extern.log4j.Log4j;
+import org.springframework.beans.ConversionNotSupportedException;
+import org.springframework.beans.TypeMismatchException;
+import org.springframework.http.*;
+import org.springframework.http.converter.HttpMessageNotReadableException;
+import org.springframework.http.converter.HttpMessageNotWritableException;
+import org.springframework.util.CollectionUtils;
+import org.springframework.validation.BindException;
+import org.springframework.web.HttpMediaTypeNotAcceptableException;
+import org.springframework.web.HttpMediaTypeNotSupportedException;
+import org.springframework.web.HttpRequestMethodNotSupportedException;
+import org.springframework.web.bind.MethodArgumentNotValidException;
+import org.springframework.web.bind.MissingPathVariableException;
+import org.springframework.web.bind.MissingServletRequestParameterException;
+import org.springframework.web.bind.ServletRequestBindingException;
+import org.springframework.web.bind.annotation.ControllerAdvice;
+import org.springframework.web.bind.annotation.ExceptionHandler;
+import org.springframework.web.context.request.ServletWebRequest;
+import org.springframework.web.context.request.WebRequest;
+import org.springframework.web.context.request.async.AsyncRequestTimeoutException;
+import org.springframework.web.multipart.support.MissingServletRequestPartException;
+import org.springframework.web.servlet.NoHandlerFoundException;
+import org.springframework.web.servlet.mvc.method.annotation.ResponseEntityExceptionHandler;
+import org.springframework.web.util.WebUtils;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.validation.ConstraintViolationException;
+import java.util.List;
+import java.util.Set;
+
+@Log4j
+@ControllerAdvice
+public class GlobalExceptionHandler extends ResponseEntityExceptionHandler {
+ /**
+ * A single place to customize the response body of all Exception types.
+ * The default implementation sets the {@link WebUtils#ERROR_EXCEPTION_ATTRIBUTE}
+ * request attribute and creates a {@link ResponseEntity} from the given
+ * body, headers, and status.
+ * @param ex the exception
+ * @param body the body for the response
+ * @param headers the headers for the response
+ * @param status the response status
+ * @param request the current request
+ */
+ @Override
+ protected ResponseEntity handleExceptionInternal(Exception ex, Object body,
+ HttpHeaders headers, HttpStatus status, WebRequest request) {
+
+ if (HttpStatus.INTERNAL_SERVER_ERROR.equals(status)) {
+ request.setAttribute(WebUtils.ERROR_EXCEPTION_ATTRIBUTE, ex, WebRequest.SCOPE_REQUEST);
+ }
+ String error = "Internal Server Error";
+ return buildResponseEntity(new RestErrorResponse(HttpStatus.INTERNAL_SERVER_ERROR, error ,ex));
+ }
+
+ /**
+ * Customize the response for HttpRequestMethodNotSupportedException.
+ * This method logs a warning, sets the "Allow" header.
+ * @param ex the exception
+ * @param headers the headers to be written to the response
+ * @param status the selected response status
+ * @param webRequest the current request
+ * @return the RestErrorResponse Object
+ */
+ @Override
+ protected ResponseEntity handleHttpRequestMethodNotSupported(HttpRequestMethodNotSupportedException ex,
+ HttpHeaders headers, HttpStatus status, WebRequest webRequest) {
+ pageNotFoundLogger.warn(ex.getMessage());
+
+ ServletWebRequest servletRequest = (ServletWebRequest) webRequest;
+ HttpServletRequest request = servletRequest.getNativeRequest(HttpServletRequest.class);
+ StringBuilder builder = new StringBuilder();
+ builder.append("Request method: " + request.getMethod()+ " is not supported. Supported Methods: ");
+ Set supportedMethods = ex.getSupportedHttpMethods();
+ supportedMethods.forEach(m -> builder.append(m).append(", "));
+
+ if (!CollectionUtils.isEmpty(supportedMethods)) {
+ headers.setAllow(supportedMethods);
+ }
+ return buildResponseEntity(new RestErrorResponse(HttpStatus.METHOD_NOT_ALLOWED, builder.substring(0, builder.length() - 2), ex));
+ }
+
+ /**
+ * Customize the response for HttpMediaTypeNotSupportedException.
+ * This method sets the "Accept" header.
+ * @param ex the exception
+ * @param headers the headers to be written to the response
+ * @param status the selected response status
+ * @param request the current request
+ * @return the RestErrorResponse Object
+ */
+ @Override
+ protected ResponseEntity handleHttpMediaTypeNotSupported(HttpMediaTypeNotSupportedException ex,
+ HttpHeaders headers, HttpStatus status, WebRequest request) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(ex.getContentType());
+ builder.append(" media type is not supported. Supported media types: ");
+ List mediaTypes = ex.getSupportedMediaTypes();
+ mediaTypes.forEach(t -> builder.append(t).append(", "));
+
+ if (!CollectionUtils.isEmpty(mediaTypes)) {
+ headers.setAccept(mediaTypes);
+ }
+
+ return buildResponseEntity(new RestErrorResponse(HttpStatus.UNSUPPORTED_MEDIA_TYPE,
+ builder.substring(0, builder.length() - 2), ex));
+ }
+
+ /**
+ * Customize the response for HttpMediaTypeNotAcceptableException.
+ * @param ex the exception
+ * @param headers the headers to be written to the response
+ * @param status the selected response status
+ * @param request the current request
+ * @return the RestErrorResponse Object
+ */
+ @Override
+ protected ResponseEntity handleHttpMediaTypeNotAcceptable(HttpMediaTypeNotAcceptableException ex,
+ HttpHeaders headers, HttpStatus status, WebRequest request) {
+ String error = "Media Type not Acceptable";
+ return buildResponseEntity(new RestErrorResponse(HttpStatus.NOT_ACCEPTABLE, error ,ex));
+ }
+
+ /**
+ * Customize the response for MissingPathVariableException.
+ * @param ex the exception
+ * @param headers the headers to be written to the response
+ * @param status the selected response status
+ * @param request the current request
+ * @return the RestErrorResponse Object
+ * @since 4.2
+ */
+ @Override
+ protected ResponseEntity handleMissingPathVariable(MissingPathVariableException ex,
+ HttpHeaders headers, HttpStatus status, WebRequest request) {
+ String error = "Path Variable : " + ex.getVariableName() + " is missing";
+ return buildResponseEntity(new RestErrorResponse(HttpStatus.BAD_REQUEST, error, ex));
+ }
+
+ /**
+ * Customize the response for MissingServletRequestParameterException.
+ * @param ex the exception
+ * @param headers the headers to be written to the response
+ * @param status the selected response status
+ * @param request the current request
+ * @return the RestErrorResponse Object
+ */
+ @Override
+ protected ResponseEntity handleMissingServletRequestParameter(MissingServletRequestParameterException ex,
+ HttpHeaders headers, HttpStatus status, WebRequest request) {
+ String error = ex.getParameterName() + " parameter is missing";
+ return buildResponseEntity(new RestErrorResponse(HttpStatus.BAD_REQUEST, error, ex));
+ }
+
+ /**
+ * Customize the response for ServletRequestBindingException.
+ * @param ex the exception
+ * @param headers the headers to be written to the response
+ * @param status the selected response status
+ * @param request the current request
+ * @return the RestErrorResponse Object
+ */
+ @Override
+ protected ResponseEntity handleServletRequestBindingException(ServletRequestBindingException ex,
+ HttpHeaders headers, HttpStatus status, WebRequest request) {
+ String error = "ServletRequest Bind Error";
+ return buildResponseEntity(new RestErrorResponse(HttpStatus.BAD_REQUEST, error ,ex));
+ }
+
+ /**
+ * Customize the response for ConversionNotSupportedException.
+ * This method delegates to {@link #handleExceptionInternal}.
+ * @param ex the exception
+ * @param headers the headers to be written to the response
+ * @param status the selected response status
+ * @param request the current request
+ * @return a {@code ResponseEntity} instance
+ */
+ @Override
+ protected ResponseEntity handleConversionNotSupported(ConversionNotSupportedException ex,
+ HttpHeaders headers, HttpStatus status, WebRequest request) {
+ return handleExceptionInternal(ex, null, headers, status, request);
+ }
+
+ /**
+ * Customize the response for TypeMismatchException.
+ * @param ex the exception
+ * @param headers the headers to be written to the response
+ * @param status the selected response status
+ * @param request the current request
+ * @return the RestErrorResponse Object
+ */
+ @Override
+ protected ResponseEntity handleTypeMismatch(TypeMismatchException ex, HttpHeaders headers,
+ HttpStatus status, WebRequest request) {
+ String error = "Request parameter value type mismatch error. ";
+ return buildResponseEntity(new RestErrorResponse(HttpStatus.BAD_REQUEST, error ,ex));
+ }
+
+ /**
+ * Customize the response for HttpMessageNotReadableException.
+ * @param ex the exception
+ * @param headers the headers to be written to the response
+ * @param status the selected response status
+ * @param request the current request
+ * @return the RestErrorResponse Object
+ */
+ @Override
+ protected ResponseEntity handleHttpMessageNotReadable(HttpMessageNotReadableException ex,
+ HttpHeaders headers, HttpStatus status, WebRequest request) {
+ String error = "Malformed JSON request";
+ return buildResponseEntity(new RestErrorResponse(HttpStatus.BAD_REQUEST, error, ex));
+ }
+
+ /**
+ * Customize the response for HttpMessageNotWritableException.
+ * @param ex the exception
+ * @param headers the headers to be written to the response
+ * @param status the selected response status
+ * @param request the current request
+ * @return the RestErrorResponse Object
+ */
+ @Override
+ protected ResponseEntity handleHttpMessageNotWritable(HttpMessageNotWritableException ex,
+ HttpHeaders headers, HttpStatus status, WebRequest request) {
+
+ String error = "Error writing JSON output";
+ return buildResponseEntity(new RestErrorResponse(HttpStatus.INTERNAL_SERVER_ERROR, error, ex));
+ }
+
+ /**
+ * Customize the response for MethodArgumentNotValidException.
+ * @param ex the exception
+ * @param headers the headers to be written to the response
+ * @param status the selected response status
+ * @param request the current request
+ * @return the RestErrorResponse Object
+ */
+ @Override
+ protected ResponseEntity handleMethodArgumentNotValid(MethodArgumentNotValidException ex,
+ HttpHeaders headers, HttpStatus status, WebRequest request) {
+
+ String error = "Method Argument Validation Error.";
+ RestErrorResponse restErrorResponse = new RestErrorResponse(HttpStatus.BAD_REQUEST, error, ex);
+ restErrorResponse.addValidationErrors(ex.getBindingResult().getFieldErrors());
+ restErrorResponse.addValidationError(ex.getBindingResult().getGlobalErrors());
+ return buildResponseEntity(restErrorResponse);
+ }
+
+ @ExceptionHandler(ConstraintViolationException.class)
+ public ResponseEntity handleConstraintViolation(ConstraintViolationException ex){
+ String error = "Constraint Violation Error.";
+ RestErrorResponse restErrorResponse = new RestErrorResponse(HttpStatus.BAD_REQUEST, error, ex);
+ restErrorResponse.addValidationErrors(ex.getConstraintViolations());
+ return buildResponseEntity(restErrorResponse);
+ }
+
+ /**
+ * Customize the response for MissingServletRequestPartException.
+ * This method delegates to {@link #handleExceptionInternal}.
+ * @param ex the exception
+ * @param headers the headers to be written to the response
+ * @param status the selected response status
+ * @param request the current request
+ * @return the RestErrorResponse Object
+ */
+ @Override
+ protected ResponseEntity handleMissingServletRequestPart(MissingServletRequestPartException ex,
+ HttpHeaders headers, HttpStatus status, WebRequest request) {
+
+ return handleExceptionInternal(ex, null, headers, status, request);
+ }
+
+ /**
+ * Customize the response for BindException.
+ * This method delegates to {@link #handleExceptionInternal}.
+ * @param ex the exception
+ * @param headers the headers to be written to the response
+ * @param status the selected response status
+ * @param request the current request
+ * @return a {@code ResponseEntity} instance
+ */
+ @Override
+ protected ResponseEntity handleBindException(BindException ex, HttpHeaders headers,
+ HttpStatus status, WebRequest request) {
+
+ return handleExceptionInternal(ex, null, headers, status, request);
+ }
+
+ /**
+ * Customize the response for NoHandlerFoundException.
+ * This method delegates to {@link #handleExceptionInternal}.
+ * @param ex the exception
+ * @param headers the headers to be written to the response
+ * @param status the selected response status
+ * @param request the current request
+ * @return a {@code ResponseEntity} instance
+ * @since 4.0
+ */
+ @Override
+ protected ResponseEntity handleNoHandlerFoundException(
+ NoHandlerFoundException ex, HttpHeaders headers, HttpStatus status, WebRequest request) {
+
+ return handleExceptionInternal(ex, null, headers, status, request);
+ }
+
+ /**
+ * Customize the response for NoHandlerFoundException.
+ * This method delegates to {@link #handleExceptionInternal}.
+ * @param ex the exception
+ * @param headers the headers to be written to the response
+ * @param status the selected response status
+ * @param webRequest the current request
+ * @return a {@code ResponseEntity} instance
+ * @since 4.2.8
+ */
+ @Override
+ protected ResponseEntity handleAsyncRequestTimeoutException(
+ AsyncRequestTimeoutException ex, HttpHeaders headers, HttpStatus status, WebRequest webRequest) {
+
+ if (webRequest instanceof ServletWebRequest) {
+ ServletWebRequest servletRequest = (ServletWebRequest) webRequest;
+ HttpServletRequest request = servletRequest.getNativeRequest(HttpServletRequest.class);
+ HttpServletResponse response = servletRequest.getNativeResponse(HttpServletResponse.class);
+ if (response.isCommitted()) {
+ if (logger.isErrorEnabled()) {
+ logger.error("Async timeout for " + request.getMethod() + " [" + request.getRequestURI() + "]");
+ }
+ return null;
+ }
+ }
+
+ return handleExceptionInternal(ex, null, headers, status, webRequest);
+ }
+
+ private ResponseEntity buildResponseEntity(RestErrorResponse restErrorResponse) {
+ return new ResponseEntity(restErrorResponse, restErrorResponse.getStatus());
+ }
+}
diff --git a/src/main/java/org/gnuhpc/bigdata/exception/KafkaExceptionHandler.java b/src/main/java/org/gnuhpc/bigdata/exception/KafkaExceptionHandler.java
index a3c649a..2e1972f 100644
--- a/src/main/java/org/gnuhpc/bigdata/exception/KafkaExceptionHandler.java
+++ b/src/main/java/org/gnuhpc/bigdata/exception/KafkaExceptionHandler.java
@@ -14,17 +14,6 @@
@Log4j
@RestControllerAdvice
public class KafkaExceptionHandler {
- @ExceptionHandler(Exception.class)
- public RestErrorResponse handleException(Exception ex){
- RestErrorResponse.Builder builder = new RestErrorResponse.Builder();
- RestErrorResponse response = builder
- .setCode(KafkaErrorCode.UNKNOWN.ordinal())
- .setMessage("Default Exception happened!")
- .setDeveloperMessage(ex.getMessage())
- .setStatus(HttpStatus.SERVICE_UNAVAILABLE).build();
- return response;
- }
-
@ExceptionHandler(ApiException.class)
public RestErrorResponse kafkaApiException(ApiException ex) {
RestErrorResponse.Builder responseBuilder = new RestErrorResponse.Builder();
@@ -34,43 +23,4 @@ public RestErrorResponse kafkaApiException(ApiException ex) {
.setDeveloperMessage(ex.getMessage())
.build();
}
-
- @ExceptionHandler(RuntimeException.class)
- public RestErrorResponse runtimeException(RuntimeException ex){
- RestErrorResponse.Builder responseBuilder = new RestErrorResponse.Builder();
- return responseBuilder.setStatus(HttpStatus.SERVICE_UNAVAILABLE)
- .setCode(KafkaErrorCode.UNKNOWN.ordinal())
- .setMessage("Runtime Exception happened!")
- .setDeveloperMessage(ex.getMessage())
- .build();
- }
-
-
- @ExceptionHandler(ConstraintViolationException.class)
- public RestErrorResponse constraintViolationException(ConstraintViolationException ex){
- StringBuilder message = new StringBuilder();
- Set> violations = ex.getConstraintViolations();
- for (ConstraintViolation> violation : violations) {
- message.append(violation.getMessage().concat(";"));
- }
-
- RestErrorResponse.Builder responseBuilder = new RestErrorResponse.Builder();
- return responseBuilder.setStatus(HttpStatus.SERVICE_UNAVAILABLE)
- .setCode(KafkaErrorCode.UNKNOWN_TOPIC_OR_PARTITION.ordinal())
- .setMessage("Constraint Violation Exception happened!")
- .setMessage(message.toString().substring(0,message.length()-1))
- .setDeveloperMessage(ex.getMessage())
- .build();
- }
-
-
- @ExceptionHandler(ResourceNotFoundException.class)
- public RestErrorResponse serviceNotAvailableException(ServiceNotAvailableException ex){
- RestErrorResponse.Builder responseBuilder = new RestErrorResponse.Builder();
- return responseBuilder.setStatus(HttpStatus.SERVICE_UNAVAILABLE)
- .setCode(KafkaErrorCode.SERVICE_DOWN.ordinal())
- .setMessage("Service not Available happened: " + ex)
- .setDeveloperMessage(ex.getMessage())
- .build();
- }
}
diff --git a/src/main/java/org/gnuhpc/bigdata/exception/RestErrorResponse.java b/src/main/java/org/gnuhpc/bigdata/exception/RestErrorResponse.java
index f452a52..72e35bf 100644
--- a/src/main/java/org/gnuhpc/bigdata/exception/RestErrorResponse.java
+++ b/src/main/java/org/gnuhpc/bigdata/exception/RestErrorResponse.java
@@ -1,20 +1,58 @@
package org.gnuhpc.bigdata.exception;
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.hibernate.validator.internal.engine.path.PathImpl;
import org.springframework.http.HttpStatus;
import org.springframework.util.ObjectUtils;
+import org.springframework.validation.FieldError;
+import org.springframework.validation.ObjectError;
+
+import javax.validation.ConstraintViolation;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
@Data
public class RestErrorResponse {
- private final HttpStatus status;
- private final int code;
- private final String message;
- private final String developerMessage;
- private final String moreInfoUrl;
+ private HttpStatus status;
+ @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss")
+ private LocalDateTime timestamp;
+ private int code;
+ private String message;
+ private String developerMessage;
+ private String moreInfoUrl;
+ private List subErrorList;
+
+ public RestErrorResponse() {
+ //this.timestamp = new Date();
+ this.timestamp = LocalDateTime.now();
+ }
+
+ public RestErrorResponse(HttpStatus status, String message, Throwable ex) {
+ this();
+ this.status = status;
+ this.code = status.value();
+ this.message = message;
+ this.developerMessage = ex.getLocalizedMessage();
+ }
+
+ public RestErrorResponse(HttpStatus status, String message, String moreInfoUrl, Throwable ex) {
+ this();
+ this.status = status;
+ this.code = status.value();
+ this.message = message;
+ this.developerMessage = ex.getLocalizedMessage();
+ this.moreInfoUrl = moreInfoUrl;
+ }
public RestErrorResponse(HttpStatus status, int code, String message, String developerMessage, String moreInfoUrl) {
+ this();
if (status == null) {
throw new NullPointerException("HttpStatus argument cannot be null.");
}
@@ -25,7 +63,6 @@ public RestErrorResponse(HttpStatus status, int code, String message, String dev
this.moreInfoUrl = moreInfoUrl;
}
-
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -58,8 +95,60 @@ public String toString() {
.toString();
}
- public static class Builder {
+ private void addSubError(RestSubError subError) {
+ if (subErrorList == null) {
+ subErrorList = new ArrayList<>();
+ }
+ subErrorList.add(subError);
+ }
+
+ private void addValidationError(String object, String field, Object rejectedValue, String message) {
+ addSubError(new RestValidationError(object, field, rejectedValue, message));
+ }
+
+ private void addValidationError(String object, String message) {
+ addSubError(new RestValidationError(object, message));
+ }
+
+ private void addValidationError(FieldError fieldError) {
+ this.addValidationError(
+ fieldError.getObjectName(),
+ fieldError.getField(),
+ fieldError.getRejectedValue(),
+ fieldError.getDefaultMessage());
+ }
+ void addValidationErrors(List fieldErrors) {
+ fieldErrors.forEach(this::addValidationError);
+ }
+
+ private void addValidationError(ObjectError objectError) {
+ this.addValidationError(
+ objectError.getObjectName(),
+ objectError.getDefaultMessage());
+ }
+
+ void addValidationError(List globalErrors) {
+ globalErrors.forEach(this::addValidationError);
+ }
+
+ /**
+ * Utility method for adding error of ConstraintViolation. Usually when a @Validated validation fails.
+ * @param cv the ConstraintViolation
+ */
+ private void addValidationError(ConstraintViolation> cv) {
+ this.addValidationError(
+ cv.getRootBeanClass().getSimpleName(),
+ ((PathImpl) cv.getPropertyPath()).getLeafNode().asString(),
+ cv.getInvalidValue(),
+ cv.getMessage());
+ }
+
+ void addValidationErrors(Set> constraintViolations) {
+ constraintViolations.forEach(this::addValidationError);
+ }
+
+ public static class Builder {
private HttpStatus status;
private int code;
private String message;
@@ -107,4 +196,23 @@ public RestErrorResponse build() {
return new RestErrorResponse(this.status, this.code, this.message, this.developerMessage, this.moreInfoUrl);
}
}
+
+ abstract class RestSubError {
+
+ }
+
+ @Data
+ @EqualsAndHashCode(callSuper = false)
+ @AllArgsConstructor
+ class RestValidationError extends RestSubError {
+ private String object;
+ private String field;
+ private Object rejectedValue;
+ private String message;
+
+ RestValidationError(String object, String message) {
+ this.object = object;
+ this.message = message;
+ }
+ }
}
diff --git a/src/main/java/org/gnuhpc/bigdata/model/HealthCheckResult.java b/src/main/java/org/gnuhpc/bigdata/model/HealthCheckResult.java
new file mode 100644
index 0000000..910f7bd
--- /dev/null
+++ b/src/main/java/org/gnuhpc/bigdata/model/HealthCheckResult.java
@@ -0,0 +1,20 @@
+package org.gnuhpc.bigdata.model;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.time.LocalDateTime;
+
+@Getter
+@Setter
+public class HealthCheckResult {
+ @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss")
+ private LocalDateTime timestamp;
+ public String status;
+ public String msg;
+
+ public HealthCheckResult() {
+ this.timestamp = LocalDateTime.now();
+ }
+}
diff --git a/src/main/java/org/gnuhpc/bigdata/model/JMXAttribute.java b/src/main/java/org/gnuhpc/bigdata/model/JMXAttribute.java
new file mode 100644
index 0000000..3419edc
--- /dev/null
+++ b/src/main/java/org/gnuhpc/bigdata/model/JMXAttribute.java
@@ -0,0 +1,330 @@
+package org.gnuhpc.bigdata.model;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import javax.management.*;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+@Getter
+@Setter
+public abstract class JMXAttribute {
+ private MBeanAttributeInfo attribute;
+ private ObjectName beanName;
+ private MBeanServerConnection connection;
+ private String attributeName;
+ private String beanStringName;
+ private String domain;
+ private HashMap beanParameters;
+ private JMXConfiguration matchingConf;
+ private LinkedHashMap> valueConversions = new LinkedHashMap>();
+ private static final List EXCLUDED_BEAN_PARAMS = Arrays.asList("domain", "domain_regex", "bean_name", "bean",
+ "bean_regex", "attribute", "exclude_tags", "tags");
+ protected static final String METRIC_TYPE = "metric_type";
+ protected static final String ALIAS = "alias";
+ private static final String FIRST_CAP_PATTERN = "(.)([A-Z][a-z]+)";
+ private static final String ALL_CAP_PATTERN = "([a-z0-9])([A-Z])";
+ private static final String METRIC_REPLACEMENT = "([^a-zA-Z0-9_.]+)|(^[^a-zA-Z]+)";
+ private static final String DOT_UNDERSCORE = "_*\\._*";
+
+ public JMXAttribute(MBeanAttributeInfo attribute, ObjectName beanName, MBeanServerConnection connection) {
+ this.attribute = attribute;
+ this.attributeName = attribute.getName();
+ this.beanName = beanName;
+ this.beanStringName = beanName.toString();
+ this.connection = connection;
+ // A bean name is formatted like that: org.apache.cassandra.db:type=Caches,keyspace=system,cache=HintsColumnFamilyKeyCache
+ // i.e. : domain:bean_parameter1,bean_parameter2
+ //Note: some beans have a ':' in the name. Example: some.domain:name="some.bean.0.0.0.0:80.some-metric"
+ int splitPosition = beanStringName.indexOf(':');
+ String domain = beanStringName.substring(0, splitPosition);
+ String beanParametersString = beanStringName.substring(splitPosition+1);
+ this.domain = domain;
+ this.matchingConf = null;
+
+ HashMap beanParametersHash = getBeanParametersHash(beanParametersString);
+ //LinkedList beanParametersList = getBeanParametersList(instanceName, beanParametersHash, instanceTags);
+ this.beanParameters = beanParametersHash;
+ }
+
+
+ public abstract LinkedList> getMetrics() throws AttributeNotFoundException, InstanceNotFoundException, MBeanException, ReflectionException, IOException;
+
+ /**
+ * An abstract function implemented in the inherited classes JMXSimpleAttribute and JMXComplexAttribute
+ *
+ * @param conf Configuration a Configuration object that will be used to check if the JMX Attribute match this configuration
+ * @return a boolean that tells if the attribute matches the configuration or not
+ */
+ public abstract boolean match(JMXConfiguration conf);
+
+ public static HashMap getBeanParametersHash(String beanParametersString) {
+ String[] beanParameters = beanParametersString.split(",");
+ HashMap beanParamsMap = new HashMap(beanParameters.length);
+ for (String param : beanParameters) {
+ String[] paramSplit = param.split("=");
+ if (paramSplit.length > 1) {
+ beanParamsMap.put(new String(paramSplit[0]), new String(paramSplit[1]));
+ } else {
+ beanParamsMap.put(new String(paramSplit[0]), "");
+ }
+ }
+
+ return beanParamsMap;
+ }
+
+ boolean matchDomain(JMXConfiguration conf) {
+ String includeDomain = conf.getInclude().getDomain();
+ Pattern includeDomainRegex = conf.getInclude().getDomainRegex();
+
+ return (includeDomain == null || includeDomain.equals(this.getDomain()))
+ && (includeDomainRegex == null || includeDomainRegex.matcher(this.getDomain()).matches());
+ }
+
+ boolean matchBean(JMXConfiguration configuration) {
+ return matchBeanName(configuration) && matchBeanRegex(configuration.getInclude(), true);
+ }
+
+ private boolean matchBeanName(JMXConfiguration configuration) {
+ JMXFilter include = configuration.getInclude();
+
+ if (!include.isEmptyBeanName() && !include.getBeanNames().contains(this.getBeanStringName())) {
+ return false;
+ }
+
+ for (String bean_attr : include.keySet()) {
+ if (EXCLUDED_BEAN_PARAMS.contains(bean_attr)) {
+ continue;
+ }
+
+ ArrayList beanValues = include.getParameterValues(bean_attr);
+
+ if (beanParameters.get(bean_attr) == null || !(beanValues.contains(beanParameters.get(bean_attr)))){
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean matchBeanRegex(JMXFilter filter, boolean matchIfNoRegex) {
+ if (filter == null) return matchIfNoRegex;
+ ArrayList beanRegexes = filter.getBeanRegexes();
+ if (beanRegexes.isEmpty()) {
+ return matchIfNoRegex;
+ }
+
+ for (Pattern beanRegex : beanRegexes) {
+ Matcher m = beanRegex.matcher(beanStringName);
+
+ if(m.matches()) {
+ for (int i = 0; i<= m.groupCount(); i++) {
+ this.beanParameters.put(Integer.toString(i), m.group(i));
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
+ boolean excludeMatchDomain(JMXConfiguration conf) {
+ if (conf.getExclude() == null) return false;
+ String excludeDomain = conf.getExclude().getDomain();
+ Pattern excludeDomainRegex = conf.getExclude().getDomainRegex();
+
+ return excludeDomain != null && excludeDomain.equals(domain)
+ || excludeDomainRegex != null && excludeDomainRegex.matcher(domain).matches();
+ }
+
+ boolean excludeMatchBean(JMXConfiguration configuration) {
+ return excludeMatchBeanName(configuration) || matchBeanRegex(configuration.getExclude(), false);
+ }
+
+ private boolean excludeMatchBeanName(JMXConfiguration conf) {
+ JMXFilter exclude = conf.getExclude();
+ if (exclude == null) return false;
+ ArrayList beanNames = exclude.getBeanNames();
+
+ if(beanNames.contains(beanStringName)){
+ return true;
+ }
+
+ for (String bean_attr : exclude.keySet()) {
+ if (EXCLUDED_BEAN_PARAMS.contains(bean_attr)) {
+ continue;
+ }
+
+ if (beanParameters.get(bean_attr) == null) {
+ continue;
+ }
+
+ ArrayList beanValues = exclude.getParameterValues(bean_attr);
+ for (String beanVal : beanValues) {
+ if (beanParameters.get(bean_attr).equals(beanVal)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ Object getJmxValue() throws AttributeNotFoundException, InstanceNotFoundException, MBeanException, ReflectionException, IOException {
+ return this.getConnection().getAttribute(this.getBeanName(), this.getAttribute().getName());
+ }
+
+ public static List getExcludedBeanParams(){
+ return EXCLUDED_BEAN_PARAMS;
+ }
+
+ double castToDouble(Object metricValue, String field) {
+ Object value = convertMetricValue(metricValue, field);
+
+ if (value instanceof String) {
+ return Double.parseDouble((String) value);
+ } else if (value instanceof Integer) {
+ return new Double((Integer) (value));
+ } else if (value instanceof AtomicInteger) {
+ return new Double(((AtomicInteger) (value)).get());
+ } else if (value instanceof AtomicLong) {
+ Long l = ((AtomicLong) (value)).get();
+ return l.doubleValue();
+ } else if (value instanceof Double) {
+ return (Double) value;
+ } else if (value instanceof Boolean) {
+ return ((Boolean) value ? 1.0 : 0.0);
+ } else if (value instanceof Long) {
+ Long l = new Long((Long) value);
+ return l.doubleValue();
+ } else if (value instanceof Number) {
+ return ((Number) value).doubleValue();
+ } else {
+ try {
+ return new Double((Double) value);
+ } catch (Exception e) {
+ throw new NumberFormatException();
+ }
+ }
+ }
+
+ Object convertMetricValue(Object metricValue, String field) {
+ Object converted = metricValue;
+
+ if (!getValueConversions(field).isEmpty()) {
+ converted = getValueConversions(field).get(metricValue);
+ if (converted == null && getValueConversions(field).get("default") != null) {
+ converted = getValueConversions(field).get("default");
+ }
+ }
+
+ return converted;
+ }
+
+ @SuppressWarnings("unchecked")
+ HashMap getValueConversions(String field) {
+ String fullAttributeName =(field!=null)?(getAttribute().getName() + "." + field):(getAttribute().getName());
+ if (valueConversions.get(fullAttributeName) == null) {
+ Object includedAttribute = matchingConf.getInclude().getAttribute();
+ if (includedAttribute instanceof LinkedHashMap, ?>) {
+ LinkedHashMap> attribute =
+ ((LinkedHashMap>>) includedAttribute).get(fullAttributeName);
+
+ if (attribute != null) {
+ valueConversions.put(fullAttributeName, attribute.get("values"));
+ }
+ }
+ if (valueConversions.get(fullAttributeName) == null) {
+ valueConversions.put(fullAttributeName, new LinkedHashMap());
+ }
+ }
+
+ return valueConversions.get(fullAttributeName);
+ }
+
+ /**
+ * Overload `getAlias` method.
+ *
+ * Note: used for `JMXSimpleAttribute` only, as `field` is null.
+ */
+ protected String getAlias(){
+ return getAlias(null);
+ }
+
+ /**
+ * Get attribute alias.
+ *
+ * In order, tries to:
+ * * Use `alias_match` to generate an alias with a regular expression
+ * * Use `alias` directly
+ * * Create an generic alias prefixed with user's `metric_prefix` preference or default to `jmx`
+ *
+ * Argument(s):
+ * * (Optional) `field`
+ * `Null` for `JMXSimpleAttribute`.
+ */
+ protected String getAlias(String field) {
+ String alias = null;
+
+ JMXFilter include = getMatchingConf().getInclude();
+
+ String fullAttributeName =(field!=null)?(getAttribute().getName() + "." + field):(getAttribute().getName());
+
+ if (include.getAttribute() instanceof LinkedHashMap, ?>) {
+ LinkedHashMap> attribute = (LinkedHashMap>) (include.getAttribute());
+ alias = getUserAlias(attribute, fullAttributeName);
+ }
+
+ //If still null - generate generic alias
+ if (alias == null) {
+ alias = "jmx." + getDomain() + "." + fullAttributeName;
+ }
+
+ return alias;
+ }
+
+ /**
+ * Retrieve user defined alias. Substitute regular expression named groups.
+ *
+ * Example:
+ * ```
+ * bean: org.datadog.jmxfetch.test:foo=Bar,qux=Baz
+ * attribute:
+ * toto:
+ * alias: my.metric.$foo.$attribute
+ * ```
+ * returns a metric name `my.metric.bar.toto`
+ */
+ private String getUserAlias(LinkedHashMap> attribute, String fullAttributeName){
+ String alias = attribute.get(fullAttributeName).get(ALIAS);
+ if (alias == null) {
+ return null;
+ }
+
+ alias = this.replaceByAlias(alias);
+
+ // Attribute & domain
+ alias = alias.replace("$attribute", fullAttributeName);
+ alias = alias.replace("$domain", domain);
+
+ return alias;
+ }
+
+ private String replaceByAlias(String alias){
+ // Bean parameters
+ for (Map.Entry param : beanParameters.entrySet()) {
+ alias = alias.replace("$" + param.getKey(), param.getValue());
+ }
+ return alias;
+ }
+
+ static String convertMetricName(String metricName) {
+ metricName = metricName.replaceAll(FIRST_CAP_PATTERN, "$1_$2");
+ metricName = metricName.replaceAll(ALL_CAP_PATTERN, "$1_$2").toLowerCase();
+ metricName = metricName.replaceAll(METRIC_REPLACEMENT, "_");
+ metricName = metricName.replaceAll(DOT_UNDERSCORE, ".").trim();
+ return metricName;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/gnuhpc/bigdata/model/JMXClient.java b/src/main/java/org/gnuhpc/bigdata/model/JMXClient.java
new file mode 100644
index 0000000..2c30e94
--- /dev/null
+++ b/src/main/java/org/gnuhpc/bigdata/model/JMXClient.java
@@ -0,0 +1,128 @@
+package org.gnuhpc.bigdata.model;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.log4j.Log4j;
+import org.gnuhpc.bigdata.config.JMXConfig;
+import org.gnuhpc.bigdata.exception.CollectorException;
+
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.SocketTimeoutException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.*;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+@Getter
+@Setter
+@Log4j
+public class JMXClient {
+ private String ip;
+ private String port;
+ private JMXConnector jmxConnector = null;
+ private static final ThreadFactory daemonThreadFactory = new DaemonThreadFactory();
+ private String jmxServiceURL;
+ private Map jmxEnv;
+ private static final long CONNECTION_TIMEOUT = 10000;
+ private static final long JMX_TIMEOUT = 20;
+
+ public JMXClient() {
+ jmxEnv = new HashMap<>();
+ jmxEnv.put(JMXConfig.JMX_CONNECT_TIMEOUT, CONNECTION_TIMEOUT);
+ }
+
+ public JMXClient(String host) {
+ this();
+ String[] ipAndPort = host.split(":");
+ this.ip = ipAndPort[0];
+ this.port = ipAndPort[1];
+ this.jmxServiceURL = new StringBuilder().append(JMXConfig.JMX_PROTOCOL)
+ .append(this.ip)
+ .append(":")
+ .append(this.port)
+ .append("/jmxrmi").toString();
+ }
+
+ public JMXConnector connect() throws CollectorException {
+ try {
+ JMXServiceURL jmxServiceURL = new JMXServiceURL(this.jmxServiceURL);
+ jmxConnector = JMXConnectorFactory.connect(jmxServiceURL, jmxEnv);
+ } catch (MalformedURLException e) {
+ throw new CollectorException(String.format("%s occurred. URL: %s. Reason: %s",
+ e.getClass().getCanonicalName(), this.jmxServiceURL, e.getCause()), e);
+ } catch (IOException e) {
+ throw new CollectorException(String.format("%s occurred. URL: %s. Reason: %s",
+ e.getClass().getCanonicalName(), this.jmxServiceURL, e.getCause()), e);
+ }
+ return jmxConnector;
+ }
+
+ /**
+ * This code comes from Datadog jmxFetch.
+ * https://github.com/DataDog/jmxfetch/blob/master/src/main/java/org/datadog/jmxfetch/Connection.java
+ */
+ public JMXConnector connectWithTimeout() throws IOException, InterruptedException {
+ JMXServiceURL url = new JMXServiceURL(this.jmxServiceURL);
+
+ BlockingQueue mailbox = new ArrayBlockingQueue(1);
+
+ ExecutorService executor = Executors.newSingleThreadExecutor(daemonThreadFactory);
+ executor.submit(() -> {
+ try {
+ JMXConnector connector = JMXConnectorFactory.connect(url, jmxEnv);
+ if (!mailbox.offer(connector)) {
+ connector.close();
+ }
+ } catch (Throwable t) {
+ mailbox.offer(t);
+ }
+ });
+ Object result;
+ try {
+ result = mailbox.poll(JMX_TIMEOUT, TimeUnit.SECONDS);
+ if (result == null) {
+ if (!mailbox.offer(""))
+ result = mailbox.take();
+ }
+ } catch (InterruptedException e) {
+ throw e;
+ } finally {
+ executor.shutdown();
+ }
+ if (result == null) {
+ log.warn("Connection timed out: " + url);
+ throw new SocketTimeoutException("Connection timed out: " + url);
+ }
+ if (result instanceof JMXConnector) {
+ jmxConnector = (JMXConnector) result;
+ return jmxConnector;
+ }
+ try {
+ throw (Throwable) result;
+ } catch (Throwable e) {
+ throw new IOException(e.toString(), e);
+ }
+ }
+
+ public void close() throws CollectorException {
+ checkNotNull(jmxConnector);
+ try {
+ jmxConnector.close();
+ } catch (IOException e) {
+ throw new CollectorException("Cannot close connection. ", e);
+ }
+ }
+
+ private static class DaemonThreadFactory implements ThreadFactory {
+ public Thread newThread(Runnable r) {
+ Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setDaemon(true);
+ return t;
+ }
+ }
+}
diff --git a/src/main/java/org/gnuhpc/bigdata/model/JMXComplexAttribute.java b/src/main/java/org/gnuhpc/bigdata/model/JMXComplexAttribute.java
new file mode 100644
index 0000000..6107797
--- /dev/null
+++ b/src/main/java/org/gnuhpc/bigdata/model/JMXComplexAttribute.java
@@ -0,0 +1,167 @@
+package org.gnuhpc.bigdata.model;
+
+import javax.management.*;
+import javax.management.openmbean.CompositeData;
+import java.io.IOException;
+import java.util.*;
+
+public class JMXComplexAttribute extends JMXAttribute {
+ private HashMap> subAttributeList;
+
+ public JMXComplexAttribute(MBeanAttributeInfo attribute, ObjectName beanName, MBeanServerConnection connection) {
+ super(attribute, beanName, connection);
+ this.subAttributeList = new HashMap<>();
+ }
+
+ @Override
+ public LinkedList> getMetrics()
+ throws AttributeNotFoundException, InstanceNotFoundException,
+ MBeanException, ReflectionException, IOException {
+
+ LinkedList> metrics = new LinkedList>();
+
+ for (Map.Entry> pair : subAttributeList.entrySet()) {
+ String subAttribute = pair.getKey();
+ HashMap metric = pair.getValue();
+ if (metric.get(ALIAS) == null) {
+ metric.put(ALIAS, convertMetricName(getAlias(subAttribute)));
+ }
+ if (metric.get(METRIC_TYPE) == null) {
+ metric.put("domain", getBeanName().getDomain());
+ metric.put("beanName", getBeanName().toString());
+ metric.put("attributeName", subAttribute);
+ metric.put(METRIC_TYPE, getMetricType(subAttribute));
+ }
+
+ /*
+ if (metric.get("tags") == null) {
+ metric.put("tags", getTags());
+ }
+ */
+
+ metric.put("value", castToDouble(getValue(subAttribute), subAttribute));
+ metrics.add(metric);
+
+ }
+ return metrics;
+
+ }
+
+ private Object getMetricType(String subAttribute) {
+ String subAttributeName = getAttribute().getName() + "." + subAttribute;
+ String metricType = null;
+
+ JMXFilter include = getMatchingConf().getInclude();
+ if (include.getAttribute() instanceof LinkedHashMap, ?>) {
+ LinkedHashMap> attribute = (LinkedHashMap>) (include.getAttribute());
+ metricType = attribute.get(subAttributeName).get(METRIC_TYPE);
+ if (metricType == null) {
+ metricType = attribute.get(subAttributeName).get("type");
+ }
+ }
+
+ if (metricType == null) {
+ metricType = "gauge";
+ }
+
+ return metricType;
+ }
+
+ private Object getValue(String subAttribute) throws AttributeNotFoundException, InstanceNotFoundException,
+ MBeanException, ReflectionException, IOException {
+
+ Object value = this.getJmxValue();
+ String attributeType = getAttribute().getType();
+
+ if ("javax.management.openmbean.CompositeData".equals(attributeType)) {
+ CompositeData data = (CompositeData) value;
+ return data.get(subAttribute);
+ } else if (("java.util.HashMap".equals(attributeType)) || ("java.util.Map".equals(attributeType))) {
+ Map data = (Map) value;
+ return data.get(subAttribute);
+ }
+ throw new NumberFormatException();
+ }
+
+ @Override
+ public boolean match(JMXConfiguration configuration) {
+ if (!matchDomain(configuration)
+ || !matchBean(configuration)
+ || excludeMatchDomain(configuration)
+ || excludeMatchBean(configuration)) {
+ return false;
+ }
+
+ try {
+ populateSubAttributeList(getJmxValue());
+ } catch (Exception e) {
+ return false;
+ }
+
+ return matchAttribute(configuration) && !excludeMatchAttribute(configuration);
+ }
+
+ private void populateSubAttributeList(Object attributeValue) {
+ String attributeType = getAttribute().getType();
+ if ("javax.management.openmbean.CompositeData".equals(attributeType)) {
+ CompositeData data = (CompositeData) attributeValue;
+ for (String key : data.getCompositeType().keySet()) {
+ this.subAttributeList.put(key, new HashMap());
+ }
+ } else if (("java.util.HashMap".equals(attributeType)) || ("java.util.Map".equals(attributeType))) {
+ Map data = (Map) attributeValue;
+ for (String key : data.keySet()) {
+ this.subAttributeList.put(key, new HashMap());
+ }
+ }
+ }
+
+ private boolean excludeMatchAttribute(JMXConfiguration configuration) {
+ JMXFilter exclude = configuration.getExclude();
+ if (exclude == null) return false;
+ if (exclude.getAttribute() != null && matchSubAttribute(exclude, getAttributeName(), false)) {
+ return true;
+ }
+
+ Iterator it = subAttributeList.keySet().iterator();
+ while (it.hasNext()) {
+ String subAttribute = it.next();
+ if (matchSubAttribute(exclude, getAttributeName() + "." + subAttribute, false)) {
+ it.remove();
+ }
+ }
+ return subAttributeList.size() <= 0;
+ }
+
+ private boolean matchAttribute(JMXConfiguration configuration) {
+ if (matchSubAttribute(configuration.getInclude(), getAttributeName(), true)) {
+ return true;
+ }
+
+ Iterator it = subAttributeList.keySet().iterator();
+
+ while (it.hasNext()) {
+ String subAttribute = it.next();
+ if (!matchSubAttribute(configuration.getInclude(), getAttributeName() + "." + subAttribute, true)) {
+ it.remove();
+ }
+ }
+
+ return subAttributeList.size() > 0;
+ }
+
+ private boolean matchSubAttribute(JMXFilter params, String subAttributeName, boolean matchOnEmpty) {
+ if ((params.getAttribute() instanceof LinkedHashMap, ?>)
+ && ((LinkedHashMap) (params.getAttribute())).containsKey(subAttributeName)) {
+ return true;
+ } else if ((params.getAttribute() instanceof ArrayList>
+ && ((ArrayList) (params.getAttribute())).contains(subAttributeName))) {
+ return true;
+ } else if (params.getAttribute() == null) {
+ return matchOnEmpty;
+ }
+ return false;
+
+ }
+
+}
diff --git a/src/main/java/org/gnuhpc/bigdata/model/JMXConfiguration.java b/src/main/java/org/gnuhpc/bigdata/model/JMXConfiguration.java
new file mode 100644
index 0000000..ee47f14
--- /dev/null
+++ b/src/main/java/org/gnuhpc/bigdata/model/JMXConfiguration.java
@@ -0,0 +1,226 @@
+package org.gnuhpc.bigdata.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Getter;
+import lombok.Setter;
+import com.fasterxml.jackson.annotation.JsonCreator;
+
+import java.util.*;
+
+@Getter
+@Setter
+public class JMXConfiguration {
+ private JMXFilter include;
+ private JMXFilter exclude;
+
+ /**
+ * Access JMXConfiguration elements more easily
+ *
+ * Also provides helper methods to extract common information among JMXFilters.
+ */
+ @JsonCreator
+ public JMXConfiguration(@JsonProperty("include") JMXFilter include, @JsonProperty("exclude") JMXFilter exclude) {
+ this.include = include;
+ this.exclude = exclude;
+ }
+
+ private Boolean hasInclude(){
+ return getInclude() != null;
+ }
+
+ /**
+ * JMXFilter a configuration list to keep the ones with `include` JMXFilters.
+ *
+ * @param configurationList the configuration list to JMXFilter
+ *
+ * @return a configuration list
+ */
+ private static LinkedList getIncludeConfigurationList(LinkedList configurationList){
+ LinkedList includeConfigList = new LinkedList(configurationList);
+ Iterator confItr = includeConfigList.iterator();
+
+ while(confItr.hasNext()) {
+ JMXConfiguration conf = confItr.next();
+ if (!conf.hasInclude()) {
+ confItr.remove();
+ }
+ }
+ return includeConfigList;
+ }
+
+ /**
+ * Extract `include` JMXFilters from the configuration list and index then by domain name.
+ *
+ * @param configurationList the configuration list to process
+ *
+ * @return JMXFilters by domain name
+ */
+ private static HashMap> getIncludeJMXFiltersByDomain(LinkedList configurationList){
+ HashMap> includeJMXFiltersByDomain = new HashMap>();
+
+ for (JMXConfiguration conf : configurationList) {
+ JMXFilter JMXFilter = conf.getInclude();
+ LinkedList JMXFilters = new LinkedList();
+
+ // Convert bean name, to a proper JMXFilter, i.e. a hash
+ if (!JMXFilter.isEmptyBeanName()) {
+ ArrayList beanNames = JMXFilter.getBeanNames();
+
+ for (String beanName : beanNames) {
+ String[] splitBeanName = beanName.split(":");
+ String domain = splitBeanName[0];
+ String rawBeanParameters = splitBeanName[1];
+ HashMap beanParametersHash = JMXAttribute.getBeanParametersHash(rawBeanParameters);
+ beanParametersHash.put("domain", domain);
+ JMXFilters.add(new JMXFilter(beanParametersHash));
+ }
+ } else {
+ JMXFilters.add(JMXFilter);
+ }
+
+ for (JMXFilter f: JMXFilters) {
+ // Retrieve the existing JMXFilters for the domain, add the new JMXFilters
+ LinkedList domainJMXFilters;
+ String domainName = f.getDomain();
+
+ if (includeJMXFiltersByDomain.containsKey(domainName)) {
+ domainJMXFilters = includeJMXFiltersByDomain.get(domainName);
+ } else {
+ domainJMXFilters = new LinkedList();
+ }
+
+ domainJMXFilters.add(f);
+ includeJMXFiltersByDomain.put(domainName, domainJMXFilters);
+ }
+ }
+ return includeJMXFiltersByDomain;
+ }
+
+ /**
+ * Extract, among JMXFilters, bean key parameters in common.
+ *
+ * @param JMXFiltersByDomain JMXFilters by domain name
+ *
+ * @return common bean key parameters by domain name
+ */
+ private static HashMap> getCommonBeanKeysByDomain(HashMap> JMXFiltersByDomain){
+ HashMap> beanKeysIntersectionByDomain = new HashMap>();
+
+ for (Map.Entry> JMXFiltersEntry : JMXFiltersByDomain.entrySet()) {
+ String domainName = JMXFiltersEntry.getKey();
+ LinkedList mJMXFilters= JMXFiltersEntry.getValue();
+
+ // Compute keys intersection
+ Set keysIntersection = new HashSet(mJMXFilters.getFirst().keySet());
+
+ for (JMXFilter f: mJMXFilters) {
+ keysIntersection.retainAll(f.keySet());
+ }
+
+ // Remove special parameters
+ for(String param : JMXAttribute.getExcludedBeanParams()){
+ keysIntersection.remove(param);
+ }
+ beanKeysIntersectionByDomain.put(domainName, keysIntersection);
+ }
+
+ return beanKeysIntersectionByDomain;
+ }
+
+ /**
+ * Build a map of common bean keys->values, with the specified bean keys, among the given JMXFilters.
+ *
+ * @param beanKeysByDomain bean keys by domain name
+ * @param JMXFiltersByDomain JMXFilters by domain name
+ *
+ * @return bean pattern (keys->values) by domain name
+ */
+ private static HashMap> getCommonScopeByDomain(HashMap> beanKeysByDomain, HashMap> JMXFiltersByDomain){
+ // Compute a common scope a among JMXFilters by domain name
+ HashMap> commonScopeByDomain = new HashMap>();
+
+ for (Map.Entry> commonParametersByDomainEntry : beanKeysByDomain.entrySet()) {
+ String domainName = commonParametersByDomainEntry.getKey();
+ Set commonParameters = commonParametersByDomainEntry.getValue();
+ LinkedList JMXFilters = JMXFiltersByDomain.get(domainName);
+ LinkedHashMap commonScope = new LinkedHashMap();
+
+ for (String parameter : commonParameters) {
+ // Check if all values associated with the parameters are the same
+ String commonValue = null;
+ Boolean hasCommonValue = true;
+
+ for (JMXFilter f : JMXFilters) {
+ ArrayList parameterValues = f.getParameterValues(parameter);
+
+ if (parameterValues.size() != 1 || (commonValue != null && !commonValue.equals(parameterValues.get(0)))) {
+ hasCommonValue = false;
+ break;
+ }
+ commonValue = parameterValues.get(0);
+
+ }
+ if (hasCommonValue) {
+ commonScope.put(parameter, commonValue);
+ }
+ }
+ commonScopeByDomain.put(domainName, commonScope);
+ }
+
+ return commonScopeByDomain;
+ }
+
+ /**
+ * Stringify a bean pattern.
+ *
+ * @param domain domain name
+ * @param beanScope map of bean keys-> values
+ *
+ * @return string pattern identifying the bean scope
+ */
+ private static String beanScopeToString(String domain, LinkedHashMap beanScope){
+ String result = "";
+
+ // Domain
+ domain = (domain != null) ? domain : "*";
+ result += domain + ":";
+
+ // Scope parameters
+ for (Map.Entry beanScopeEntry : beanScope.entrySet()) {
+ String param = beanScopeEntry.getKey();
+ String value = beanScopeEntry.getValue();
+
+ result += param + "=" + value + ",";
+ }
+ result += "*";
+
+ return result;
+ }
+
+ /**
+ * Find, among the JMXConfiguration list, a potential common bean pattern by domain name.
+ *
+ * @param JMXConfigurationList the JMXConfiguration list to process
+ *
+ * @return common bean pattern strings
+ */
+ public static LinkedList getGreatestCommonScopes(LinkedList JMXConfigurationList){
+ LinkedList result = new LinkedList();
+ if (JMXConfigurationList == null || JMXConfigurationList.isEmpty()) {
+ return result;
+ }
+ LinkedList includeConfigList = getIncludeConfigurationList(JMXConfigurationList);
+ HashMap> includeJMXFiltersByDomain = getIncludeJMXFiltersByDomain(includeConfigList);
+ HashMap> parametersIntersectionByDomain = getCommonBeanKeysByDomain(includeJMXFiltersByDomain);
+ HashMap> commonBeanScopeByDomain = getCommonScopeByDomain(parametersIntersectionByDomain, includeJMXFiltersByDomain);
+
+ for (Map.Entry> beanScopeEntry: commonBeanScopeByDomain.entrySet()) {
+ String domain = beanScopeEntry.getKey();
+ LinkedHashMap beanScope = beanScopeEntry.getValue();
+
+ result.add(beanScopeToString(domain, beanScope));
+ }
+
+ return result;
+ }
+}
diff --git a/src/main/java/org/gnuhpc/bigdata/model/JMXFilter.java b/src/main/java/org/gnuhpc/bigdata/model/JMXFilter.java
new file mode 100644
index 0000000..79c7e25
--- /dev/null
+++ b/src/main/java/org/gnuhpc/bigdata/model/JMXFilter.java
@@ -0,0 +1,180 @@
+package org.gnuhpc.bigdata.model;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.*;
+import java.util.regex.Pattern;
+
+@Getter
+@Setter
+public class JMXFilter {
+ HashMap filter;
+ Pattern domainRegex;
+ ArrayList beanRegexes = null;
+ //ArrayList excludeTags = null;
+ //HashMap additionalTags = null;
+
+ /**
+ * A simple class to manipulate include/exclude filter elements more easily
+ * A filter may contain:
+ * - A domain (key: 'domain') or a domain regex (key: 'domain_regex')
+ * - Bean names (key: 'bean' or 'bean_name') or bean regexes (key: 'bean_regex')
+ * - Attributes (key: 'attribute')
+ * - Additional bean parameters (other keys)
+ */
+ @JsonCreator
+ @SuppressWarnings("unchecked")
+ public JMXFilter(Object filter) {
+ HashMap castFilter;
+ if (filter != null) {
+ castFilter = (HashMap) filter;
+ } else {
+ castFilter = new HashMap();
+ }
+ this.filter = castFilter;
+ }
+
+ public String toString() {
+ return this.filter.toString();
+ }
+
+ public Set keySet() {
+ return filter.keySet();
+ }
+
+ @SuppressWarnings({ "unchecked", "serial" })
+ private static ArrayList toStringArrayList(final Object toCast) {
+ // Return object as an ArrayList wherever it's defined as
+ // list or not
+ //
+ // ### Example
+ // object:
+ // - firstValue
+ // - secondValue
+ // ### OR
+ // object: singleValue
+ // ###
+ if (toCast instanceof String) {
+ ArrayList toCastList = new ArrayList<>();
+ toCastList.add(toCast.toString());
+ return toCastList;
+ }
+ return (ArrayList) toCast;
+ }
+
+
+ public ArrayList getBeanNames() {
+ if (isEmptyBeanName()){
+ return new ArrayList();
+ }
+ final Object beanNames = (filter.get("bean") != null) ? filter.get("bean") : filter.get("bean_name");
+ // Return bean names as an ArrayList wherever it's defined as
+ // list or not
+ //
+ // ### Example
+ // bean:
+ // - org.apache.cassandra.db:type=Caches,keyspace=system,cache=HintsColumnFamilyKeyCache
+ // - org.datadog.jmxfetch.test:type=type=SimpleTestJavaApp
+ // ### OR
+ // bean: org.datadog.jmxfetch.test:type=type=SimpleTestJavaApp
+ // ###
+ return toStringArrayList(beanNames);
+ }
+
+ private static ArrayList toPatternArrayList(final Object toCast) {
+ ArrayList patternArrayList = new ArrayList();
+ ArrayList stringArrayList = toStringArrayList(toCast);
+ for (String string : stringArrayList) {
+ patternArrayList.add(Pattern.compile(string));
+ }
+
+ return patternArrayList;
+ }
+
+ public ArrayList getBeanRegexes() {
+ // Return bean regexes as an ArrayList of Pattern whether it's defined as
+ // a list or not
+
+ if (this.beanRegexes == null) {
+ if (filter.get("bean_regex") == null){
+ this.beanRegexes = new ArrayList();
+ } else {
+ final Object beanRegexNames = filter.get("bean_regex");
+ this.beanRegexes = toPatternArrayList(beanRegexNames);
+ }
+ }
+
+ return this.beanRegexes;
+ }
+
+ /*
+ public ArrayList getExcludeTags() {
+ // Return excluded tags as an ArrayList whether it's defined as a list or not
+
+ if (this.excludeTags == null) {
+ if (filter.get("exclude_tags") == null){
+ this.excludeTags = new ArrayList();
+ } else {
+ final Object exclude_tags = filter.get("exclude_tags");
+ this.excludeTags = toStringArrayList(exclude_tags);
+ }
+ }
+
+ return this.excludeTags;
+ }
+
+ public HashMap getAdditionalTags() {
+ // Return additional tags
+ if (this.additionalTags == null) {
+ if (filter.get("tags") == null){
+ this.additionalTags = new HashMap();
+ } else {
+ this.additionalTags = (HashMap)filter.get("tags");
+ }
+ }
+
+ return this.additionalTags;
+ }
+ */
+
+ public String getDomain() {
+ return (String) filter.get("domain");
+ }
+
+ public Pattern getDomainRegex() {
+ if (this.filter.get("domain_regex") == null) {
+ return null;
+ }
+
+ if (this.domainRegex == null) {
+ this.domainRegex = Pattern.compile((String) this.filter.get("domain_regex"));
+ }
+
+ return this.domainRegex;
+ }
+
+ public Object getAttribute() {
+ return filter.get("attribute");
+ }
+
+ public ArrayList getParameterValues(String parameterName) {
+ // Return bean attributes values as an ArrayList wherever it's defined as
+ // list or not
+ //
+ // ### Example
+ // bean_parameter:
+ // - exampleType1
+ // - exampleType2
+ // ### OR
+ // bean_parameter: onlyOneType
+ // ###
+ final Object beanValues = filter.get(parameterName);
+ return toStringArrayList(beanValues);
+ }
+
+ public boolean isEmptyBeanName() {
+ return (filter.get("bean") == null && filter.get("bean_name") == null);
+ }
+}
diff --git a/src/main/java/org/gnuhpc/bigdata/model/JMXMetricData.java b/src/main/java/org/gnuhpc/bigdata/model/JMXMetricData.java
new file mode 100644
index 0000000..5ee3876
--- /dev/null
+++ b/src/main/java/org/gnuhpc/bigdata/model/JMXMetricData.java
@@ -0,0 +1,26 @@
+package org.gnuhpc.bigdata.model;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.LinkedList;
+
+@Getter
+@Setter
+public class JMXMetricData {
+ private String host;
+ @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss")
+ private LocalDateTime timestamp;
+ private Boolean collected;
+ private LinkedList> metrics;
+ private String msg;
+
+ public JMXMetricData(String host, LinkedList> metrics) {
+ this.host = host;
+ this.timestamp = LocalDateTime.now();
+ this.metrics = metrics;
+ }
+}
diff --git a/src/main/java/org/gnuhpc/bigdata/model/JMXMetricDataV1.java b/src/main/java/org/gnuhpc/bigdata/model/JMXMetricDataV1.java
new file mode 100644
index 0000000..8ee573e
--- /dev/null
+++ b/src/main/java/org/gnuhpc/bigdata/model/JMXMetricDataV1.java
@@ -0,0 +1,25 @@
+package org.gnuhpc.bigdata.model;
+
+import java.time.LocalDateTime;
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
+public class JMXMetricDataV1 {
+ private String host;
+ @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss")
+ private LocalDateTime timestamp;
+ private Boolean collected;
+ private Map mbeanInfo;
+ private String msg;
+
+ public JMXMetricDataV1(String host, Map mbeanInfo) {
+ this.host = host;
+ this.timestamp = LocalDateTime.now();
+ this.mbeanInfo = mbeanInfo;
+ }
+}
diff --git a/src/main/java/org/gnuhpc/bigdata/model/JMXQuery.java b/src/main/java/org/gnuhpc/bigdata/model/JMXQuery.java
new file mode 100644
index 0000000..bf03ad2
--- /dev/null
+++ b/src/main/java/org/gnuhpc/bigdata/model/JMXQuery.java
@@ -0,0 +1,13 @@
+package org.gnuhpc.bigdata.model;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.LinkedList;
+
+
+@Getter
+@Setter
+public class JMXQuery {
+ private LinkedList filters;
+}
diff --git a/src/main/java/org/gnuhpc/bigdata/model/JMXSimpleAttribute.java b/src/main/java/org/gnuhpc/bigdata/model/JMXSimpleAttribute.java
new file mode 100644
index 0000000..0ddd4f4
--- /dev/null
+++ b/src/main/java/org/gnuhpc/bigdata/model/JMXSimpleAttribute.java
@@ -0,0 +1,105 @@
+package org.gnuhpc.bigdata.model;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import javax.management.*;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+
+@Getter
+@Setter
+public class JMXSimpleAttribute extends JMXAttribute {
+ private String metricType;
+
+ public JMXSimpleAttribute(MBeanAttributeInfo attribute, ObjectName beanName, MBeanServerConnection connection) {
+ super(attribute, beanName, connection);
+ }
+
+ @Override
+ public LinkedList> getMetrics() throws AttributeNotFoundException,
+ InstanceNotFoundException, MBeanException, ReflectionException, IOException {
+ HashMap metric = new HashMap();
+
+ metric.put("domain", getBeanName().getDomain());
+ metric.put("beanName", getBeanName().toString());
+ metric.put("attributeName", getAttributeName());
+ metric.put("alias", getAlias());
+ metric.put("value", castToDouble(getValue(), null));
+ //metric.put("tags", getTags());
+ metric.put("metric_type", getMetricType());
+ LinkedList> metrics = new LinkedList>();
+ metrics.add(metric);
+ return metrics;
+ }
+
+ public boolean match(JMXConfiguration configuration) {
+ return matchDomain(configuration)
+ && matchBean(configuration)
+ && matchAttribute(configuration)
+ && !(
+ excludeMatchDomain(configuration)
+ || excludeMatchBean(configuration)
+ || excludeMatchAttribute(configuration));
+
+ }
+
+ private boolean matchAttribute(JMXConfiguration configuration) {
+ JMXFilter include = configuration.getInclude();
+ if (include.getAttribute() == null) {
+ return true;
+ } else if ((include.getAttribute() instanceof LinkedHashMap, ?>)
+ && ((LinkedHashMap) (include.getAttribute())).containsKey(getAttributeName())) {
+ return true;
+
+ } else if ((include.getAttribute() instanceof ArrayList>
+ && ((ArrayList) (include.getAttribute())).contains(getAttributeName()))) {
+ return true;
+ }
+
+ return false;
+ }
+
+ private boolean excludeMatchAttribute(JMXConfiguration configuration) {
+ JMXFilter exclude = configuration.getExclude();
+ if (exclude == null) return false;
+ if (exclude.getAttribute() == null) {
+ return false;
+ } else if ((exclude.getAttribute() instanceof LinkedHashMap, ?>)
+ && ((LinkedHashMap) (exclude.getAttribute())).containsKey(getAttributeName())) {
+ return true;
+
+ } else if ((exclude.getAttribute() instanceof ArrayList>
+ && ((ArrayList) (exclude.getAttribute())).contains(getAttributeName()))) {
+ return true;
+ }
+ return false;
+ }
+
+ private Object getValue() throws AttributeNotFoundException, InstanceNotFoundException, MBeanException,
+ ReflectionException, IOException, NumberFormatException {
+ return this.getJmxValue();
+ }
+
+ private String getMetricType() {
+ JMXFilter include = getMatchingConf().getInclude();
+ if (metricType != null) {
+ return metricType;
+ } else if (include.getAttribute() instanceof LinkedHashMap, ?>) {
+ LinkedHashMap> attribute = (LinkedHashMap>) (include.getAttribute());
+ metricType = attribute.get(getAttributeName()).get(METRIC_TYPE);
+ if (metricType == null) {
+ metricType = attribute.get(getAttributeName()).get("type");
+ }
+ }
+
+ if (metricType == null) { // Default to gauge
+ metricType = "gauge";
+ }
+
+ return metricType;
+ }
+}
diff --git a/src/main/java/org/gnuhpc/bigdata/model/JMXTabularAttribute.java b/src/main/java/org/gnuhpc/bigdata/model/JMXTabularAttribute.java
new file mode 100644
index 0000000..916adc3
--- /dev/null
+++ b/src/main/java/org/gnuhpc/bigdata/model/JMXTabularAttribute.java
@@ -0,0 +1,260 @@
+package org.gnuhpc.bigdata.model;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.log4j.Log4j;
+
+import javax.management.*;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.InvalidKeyException;
+import javax.management.openmbean.TabularData;
+import java.io.IOException;
+import java.util.*;
+
+@Getter
+@Setter
+@Log4j
+public class JMXTabularAttribute extends JMXAttribute {
+
+ private HashMap>> subAttributeList;
+
+ public JMXTabularAttribute(MBeanAttributeInfo attribute, ObjectName beanName, MBeanServerConnection connection) {
+ super(attribute, beanName, connection);
+ subAttributeList = new HashMap>>();
+ }
+
+ @Override
+ public LinkedList> getMetrics() throws AttributeNotFoundException,
+ InstanceNotFoundException, MBeanException, ReflectionException, IOException {
+ LinkedList> metrics = new LinkedList>();
+ HashMap>> subMetrics = new HashMap>>();
+
+ for (String dataKey : subAttributeList.keySet()) {
+ HashMap> subSub = subAttributeList.get(dataKey);
+ for (String metricKey : subSub.keySet()) {
+ String fullMetricKey = getAttributeName() + "." + metricKey;
+
+ HashMap metric = subSub.get(metricKey);
+ metric.put("domain", getBeanName().getDomain());
+ metric.put("beanName", getBeanName().toString());
+ metric.put("attributeName", fullMetricKey);
+ if (metric.get(ALIAS) == null) {
+ metric.put(ALIAS, convertMetricName(getAlias(metricKey)));
+ }
+
+ if (metric.get(METRIC_TYPE) == null) {
+ metric.put(METRIC_TYPE, getMetricType(metricKey));
+ }
+
+ /*
+ if (metric.get("tags") == null) {
+ metric.put("tags", getTags(dataKey, metricKey));
+ }*/
+
+ metric.put("value", castToDouble(getValue(dataKey, metricKey), null));
+
+ if(!subMetrics.containsKey(fullMetricKey)) {
+ subMetrics.put(fullMetricKey, new LinkedList>());
+ }
+ subMetrics.get(fullMetricKey).add(metric);
+ }
+ }
+
+ for (String key : subMetrics.keySet()) {
+ // only add explicitly included metrics
+ if (getAttributesFor(key) != null) {
+ metrics.addAll(sortAndFilter(key, subMetrics.get(key)));
+ }
+ }
+
+ return metrics;
+ }
+
+ private Object getMetricType(String subAttribute) {
+ String subAttributeName = getAttribute().getName() + "." + subAttribute;
+ String metricType = null;
+
+ JMXFilter include = getMatchingConf().getInclude();
+ if (include.getAttribute() instanceof LinkedHashMap, ?>) {
+ LinkedHashMap> attribute = (LinkedHashMap>) (include.getAttribute());
+ metricType = attribute.get(subAttributeName).get(METRIC_TYPE);
+ if (metricType == null) {
+ metricType = attribute.get(subAttributeName).get("type");
+ }
+ }
+
+ if (metricType == null) {
+ metricType = "gauge";
+ }
+
+ return metricType;
+ }
+
+ private Object getValue(String key, String subAttribute) throws AttributeNotFoundException,
+ InstanceNotFoundException,
+ MBeanException, ReflectionException, IOException {
+
+ try{
+ Object value = this.getJmxValue();
+ String attributeType = getAttribute().getType();
+
+ TabularData data = (TabularData) value;
+ for (Object rowKey : data.keySet()) {
+ Collection keys = (Collection) rowKey;
+ String pathKey = getMultiKey(keys);
+ if (key.equals(pathKey)) {
+ CompositeData compositeData = data.get(keys.toArray());
+ if (subAttribute.contains(".")) {
+ // walk down the path
+ Object o;
+ for (String subPathKey : subAttribute.split("\\.")) {
+ o = compositeData.get(subPathKey);
+ if (o instanceof CompositeData) {
+ compositeData = (CompositeData) o;
+ } else {
+ return compositeData.get(subPathKey);
+ }
+ }
+ } else {
+ return compositeData.get(subAttribute);
+ }
+ }
+ }
+ }
+ catch (InvalidKeyException e) {
+ log.warn("`"+getAttribute().getName()+"` attribute does not have a `"+subAttribute+"` key.");
+ return null;
+ }
+
+ throw new NumberFormatException();
+ }
+
+ private Map getAttributesFor(String key) {
+ JMXFilter include = getMatchingConf().getInclude();
+ if (include != null) {
+ Object includeAttribute = include.getAttribute();
+ if (includeAttribute instanceof LinkedHashMap, ?>) {
+ return (Map) ((Map)includeAttribute).get(key);
+ }
+ }
+ return null;
+ }
+
+ private List> sortAndFilter(String metricKey, LinkedList>
+ metrics) {
+ Map attributes = getAttributesFor(metricKey);
+ if (!attributes.containsKey("limit")) {
+ return metrics;
+ }
+ Integer limit = (Integer) attributes.get("limit");
+ if (metrics.size() <= limit) {
+ return metrics;
+ }
+ MetricComparator comp = new MetricComparator();
+ Collections.sort(metrics, comp);
+ String sort = (String) attributes.get("sort");
+ if (sort == null || sort.equals("desc")) {
+ metrics.subList(0, limit).clear();
+ } else {
+ metrics.subList(metrics.size() - limit, metrics.size()).clear();
+ }
+ return metrics;
+ }
+
+ private class MetricComparator implements Comparator> {
+ public int compare(HashMap o1, HashMap o2) {
+ Double v1 = (Double) o1.get("value");
+ Double v2 = (Double) o2.get("value");
+ return v1.compareTo(v2);
+ }
+ }
+
+ @Override
+ public boolean match(JMXConfiguration configuration) {
+ if (!matchDomain(configuration)
+ || !matchBean(configuration)
+ || excludeMatchDomain(configuration)
+ || excludeMatchBean(configuration)) {
+ return false;
+ }
+
+ try {
+ populateSubAttributeList(getJmxValue());
+ } catch (Exception e) {
+ return false;
+ }
+
+ return matchAttribute(configuration);//TODO && !excludeMatchAttribute(configuration);
+ }
+
+ private void populateSubAttributeList(Object value) {
+ TabularData data = (TabularData) value;
+ for (Object rowKey : data.keySet()) {
+ Collection keys = (Collection) rowKey;
+ CompositeData compositeData = data.get(keys.toArray());
+ String pathKey = getMultiKey(keys);
+ HashMap> subAttributes = new HashMap>();
+ for (String key : compositeData.getCompositeType().keySet()) {
+ if (compositeData.get(key) instanceof CompositeData) {
+ for (String subKey : ((CompositeData) compositeData.get(key)).getCompositeType().keySet()) {
+ subAttributes.put(key + "." + subKey, new HashMap());
+ }
+ } else {
+ subAttributes.put(key, new HashMap());
+ }
+ }
+ subAttributeList.put(pathKey, subAttributes);
+ }
+ }
+
+ private boolean matchAttribute(JMXConfiguration configuration) {
+ if (matchSubAttribute(configuration.getInclude(), getAttributeName(), true)) {
+ return true;
+ }
+
+ Iterator it1 = subAttributeList.keySet().iterator();
+ while (it1.hasNext()) {
+ String key = it1.next();
+ HashMap> subSub = subAttributeList.get(key);
+ Iterator it2 = subSub.keySet().iterator();
+ while (it2.hasNext()) {
+ String subKey = it2.next();
+ if (!matchSubAttribute(configuration.getInclude(), getAttributeName() + "." + subKey, true)) {
+ it2.remove();
+ }
+ }
+ if (subSub.size() <= 0) {
+ it1.remove();
+ }
+ }
+
+ return subAttributeList.size() > 0;
+ }
+
+ private String getMultiKey(Collection keys) {
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+ for (Object key : keys) {
+ if (!first) { sb.append(","); }
+ // I hope these have sane toString() methods
+ sb.append(key.toString());
+ first = false;
+ }
+ return sb.toString();
+ }
+
+ private boolean matchSubAttribute(JMXFilter params, String subAttributeName, boolean matchOnEmpty) {
+ if ((params.getAttribute() instanceof LinkedHashMap, ?>)
+ && ((LinkedHashMap) (params.getAttribute())).containsKey(subAttributeName)) {
+ return true;
+ } else if ((params.getAttribute() instanceof ArrayList>
+ && ((ArrayList) (params.getAttribute())).contains(subAttributeName))) {
+ return true;
+ } else if (params.getAttribute() == null) {
+ return matchOnEmpty;
+ }
+ return false;
+ }
+}
diff --git a/src/main/java/org/gnuhpc/bigdata/model/User.java b/src/main/java/org/gnuhpc/bigdata/model/User.java
new file mode 100644
index 0000000..be06e96
--- /dev/null
+++ b/src/main/java/org/gnuhpc/bigdata/model/User.java
@@ -0,0 +1,26 @@
+package org.gnuhpc.bigdata.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import org.hibernate.validator.constraints.NotBlank;
+import org.hibernate.validator.constraints.NotEmpty;
+
+import javax.validation.constraints.NotNull;
+
+@Getter
+@Setter
+@AllArgsConstructor
+public class User {
+ @NotNull(message = "Username can not be null.")
+ @NotBlank(message = "Username can not be blank.")
+ private String username;
+
+ @NotNull(message = "Password can not be null.")
+ @NotBlank(message = "Password can not be blank.")
+ private String password;
+
+ @NotNull(message = "Role can not be null.")
+ @NotBlank(message = "Role can not be blank.")
+ private String role;
+}
diff --git a/src/main/java/org/gnuhpc/bigdata/security/BasicAuthenticationPoint.java b/src/main/java/org/gnuhpc/bigdata/security/BasicAuthenticationPoint.java
new file mode 100644
index 0000000..3c2d795
--- /dev/null
+++ b/src/main/java/org/gnuhpc/bigdata/security/BasicAuthenticationPoint.java
@@ -0,0 +1,53 @@
+package org.gnuhpc.bigdata.security;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import lombok.NoArgsConstructor;
+import org.gnuhpc.bigdata.exception.RestErrorResponse;
+import org.springframework.boot.jackson.JsonComponent;
+import org.springframework.http.HttpStatus;
+import org.springframework.security.core.AuthenticationException;
+import org.springframework.security.web.authentication.www.BasicAuthenticationEntryPoint;
+import org.springframework.stereotype.Component;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+@Component
+public class BasicAuthenticationPoint extends BasicAuthenticationEntryPoint {
+ @Override
+ public void commence(HttpServletRequest request, HttpServletResponse response, AuthenticationException authEx)
+ throws IOException, ServletException {
+ response.addHeader("WWW-Authenticate", "Basic realm=" +getRealmName());
+ response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
+ String error = "Authenciation Error:" + authEx.getClass().getCanonicalName();
+ RestErrorResponse restAuthenticationError = new RestErrorResponse(HttpStatus.UNAUTHORIZED, error, authEx);
+ ObjectMapper mapper = new ObjectMapper();
+ JavaTimeModule javaTimeModule = new JavaTimeModule();
+ javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer());
+ mapper.registerModule(javaTimeModule);
+ response.getWriter().print(mapper.writeValueAsString(restAuthenticationError));
+ }
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ setRealmName("Contact Big Data Infrastructure Team to get available accounts.");
+ super.afterPropertiesSet();
+ }
+
+ @JsonComponent
+ @NoArgsConstructor
+ private class LocalDateTimeSerializer extends JsonSerializer {
+ @Override
+ public void serialize(LocalDateTime value, JsonGenerator gen, SerializerProvider sp) throws IOException{
+ gen.writeString(value.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
+ }
+ }
+}
diff --git a/src/main/java/org/gnuhpc/bigdata/security/UserDetailsServiceImp.java b/src/main/java/org/gnuhpc/bigdata/security/UserDetailsServiceImp.java
new file mode 100644
index 0000000..2bb09e7
--- /dev/null
+++ b/src/main/java/org/gnuhpc/bigdata/security/UserDetailsServiceImp.java
@@ -0,0 +1,89 @@
+package org.gnuhpc.bigdata.security;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.log4j.Log4j;
+import org.gnuhpc.bigdata.config.WebSecurityConfig;
+import org.gnuhpc.bigdata.model.User;
+import org.gnuhpc.bigdata.utils.CommonUtils;
+import org.springframework.security.core.userdetails.User.UserBuilder;
+import org.springframework.security.core.userdetails.UserDetails;
+import org.springframework.security.core.userdetails.UserDetailsService;
+import org.springframework.security.core.userdetails.UsernameNotFoundException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Log4j
+public class UserDetailsServiceImp implements UserDetailsService {
+ private ScheduledExecutorService securityFileChecker;
+ private ArrayList userList = new ArrayList<>();
+
+ public UserDetailsServiceImp(boolean checkSecurity, int checkInitDelay, int checkSecurityInterval) {
+ if (checkSecurity) {
+ securityFileChecker = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat("securityFileChecker").build());
+ securityFileChecker.scheduleWithFixedDelay(new SecurityFileCheckerRunnable(),
+ checkInitDelay, checkSecurityInterval, TimeUnit.SECONDS);
+ userList = fetchUserListFromSecurtiyFile();
+ }
+ }
+
+ @Override
+ public UserDetails loadUserByUsername(String username) throws UsernameNotFoundException {
+ User user = findUserByUsername(username);
+
+ UserBuilder builder;
+ if (user != null) {
+ builder = org.springframework.security.core.userdetails.User.withUsername(username);
+ builder.password(user.getPassword());
+ builder.roles(user.getRole());
+ } else {
+ throw new UsernameNotFoundException("User not found.");
+ }
+
+ return builder.build();
+ }
+
+ private User findUserByUsername(String username) {
+ for (User user:userList) {
+ if (username.equals(user.getUsername())) {
+ return user;
+ }
+ }
+ return null;
+ }
+
+ private ArrayList fetchUserListFromSecurtiyFile() {
+ String securityFilePath = WebSecurityConfig.SECURITY_FILE_PATH;
+ try {
+ HashMap accounts = CommonUtils.yamlParse(securityFilePath);
+ userList.clear();
+ accounts.forEach((key, value)->{
+ String username = (String)key;
+ Map userInfo = (Map)value;
+ userList.add(new User(username, userInfo.get("password"), userInfo.get("role")));
+ });
+ } catch (IOException ioException) {
+ log.error("Security file process exception.", ioException);
+ }
+
+ return userList;
+ }
+
+ private class SecurityFileCheckerRunnable implements Runnable {
+ @Override
+ public void run() {
+ try {
+ userList = fetchUserListFromSecurtiyFile();
+ } catch (Throwable t) {
+ log.error("Uncaught exception in SecurityFileChecker thread", t);
+ }
+ }
+ }
+}
diff --git a/src/main/java/org/gnuhpc/bigdata/service/CollectorService.java b/src/main/java/org/gnuhpc/bigdata/service/CollectorService.java
new file mode 100644
index 0000000..e3bcade
--- /dev/null
+++ b/src/main/java/org/gnuhpc/bigdata/service/CollectorService.java
@@ -0,0 +1,248 @@
+package org.gnuhpc.bigdata.service;
+
+import lombok.extern.log4j.Log4j;
+import org.gnuhpc.bigdata.config.JMXConfig;
+import org.gnuhpc.bigdata.exception.CollectorException;
+import org.gnuhpc.bigdata.model.*;
+import org.gnuhpc.bigdata.utils.CommonUtils;
+import org.json.JSONObject;
+import org.springframework.stereotype.Service;
+import org.springframework.validation.annotation.Validated;
+
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+@Service
+@Log4j
+@Validated
+public class CollectorService {
+ private final static List SIMPLE_TYPES = Arrays.asList("long",
+ "java.lang.String", "int", "float", "double", "java.lang.Double","java.lang.Float", "java.lang.Integer", "java.lang.Long",
+ "java.util.concurrent.atomic.AtomicInteger", "java.util.concurrent.atomic.AtomicLong",
+ "java.lang.Object", "java.lang.Boolean", "boolean", "java.lang.Number");
+ private final static List COMPOSED_TYPES = Arrays.asList("javax.management.openmbean.CompositeData", "java.util.HashMap", "java.util.Map");
+ private final static List MULTI_TYPES = Arrays.asList("javax.management.openmbean.TabularData");
+
+ public List collectJMXData(String jmxurl) {
+ LinkedList jmxMetricDataList = new LinkedList<>();
+ String[] hostList = jmxurl.split(",");
+ for (String host : hostList) {
+ JMXClient jmxClient = new JMXClient(host);
+ Map metricData = new HashMap<>();
+ JMXMetricDataV1 jmxMetricData = new JMXMetricDataV1(host, metricData);
+ try {
+ log.info("Start to collect JMXServiceURL:" + jmxClient.getJmxServiceURL());
+ jmxClient.connectWithTimeout();
+ MBeanServerConnection mBeanServerConnection = jmxClient.getJmxConnector().getMBeanServerConnection();
+ Set objectNames = mBeanServerConnection.queryNames(null, null);
+ for (ObjectName objectName : objectNames) {
+ Map attributeInfoMap = getAttributeInfoByObjectName(mBeanServerConnection, objectName);
+ metricData.put(objectName.toString(), attributeInfoMap);
+ }
+ jmxMetricData.setCollected(true);
+ } catch (Exception e) {
+ jmxMetricData.setCollected(false);
+ CollectorException ce = new CollectorException(String.format("%s occurred. URL: %s. Reason: %s",
+ e.getClass().getCanonicalName(), jmxClient.getJmxServiceURL(), e.getCause()), e);
+ jmxMetricData.setMsg(ce.getLocalizedMessage());
+ log.error("Failed to connect to " + jmxClient.getJmxServiceURL(), ce);
+ } finally {
+ jmxMetricDataList.add(jmxMetricData);
+ if (jmxClient.getJmxConnector() != null) {
+ try {
+ jmxClient.close();
+ } catch (Throwable t) {
+ log.error("Connection close error occurred. ", t);
+ }
+ }
+ }
+ }
+
+ return jmxMetricDataList;
+ }
+
+ public List collectJMXData(String jmxurl, JMXQuery jmxQuery) {
+ List jmxMetricDataList = new ArrayList<>();
+ LinkedList configurationList = jmxQuery.getFilters();
+ LinkedList beanScopes = JMXConfiguration.getGreatestCommonScopes(configurationList);
+ Set beans = new HashSet<>();
+ LinkedList matchingAttributes = new LinkedList<>();
+ LinkedList> metrics = new LinkedList<>();
+
+ String[] hostList = jmxurl.split(",");
+
+ for (String host : hostList) {
+ JMXClient jmxClient = new JMXClient(host);
+ beans.clear();
+ matchingAttributes.clear();
+ metrics.clear();
+ JMXMetricData jmxMetricData = new JMXMetricData(host, metrics);
+ try {
+ jmxClient.connectWithTimeout();
+ MBeanServerConnection mBeanServerConnection = jmxClient.getJmxConnector().getMBeanServerConnection();
+ for (String scope : beanScopes) {
+ ObjectName name = new ObjectName(scope);
+ beans.addAll(mBeanServerConnection.queryNames(name, null));
+ }
+ beans = (beans.isEmpty()) ? mBeanServerConnection.queryNames(null, null) : beans;
+ getMatchingAttributes(matchingAttributes, mBeanServerConnection, beans, configurationList);
+ jmxMetricData.setMetrics(getMetrics(matchingAttributes));
+ jmxMetricData.setCollected(true);
+ } catch (Exception e) {
+ jmxMetricData.setCollected(false);
+ CollectorException ce = new CollectorException(String.format("%s occurred. URL: %s. Reason: %s",
+ e.getClass().getCanonicalName(), jmxClient.getJmxServiceURL(), e.getCause()), e);
+ jmxMetricData.setMsg(ce.getLocalizedMessage());
+ log.error("Failed to connect to " + jmxClient.getJmxServiceURL(), ce);
+ } finally {
+ jmxMetricDataList.add(jmxMetricData);
+ try {
+ if (jmxClient.getJmxConnector() != null) {
+ jmxClient.close();
+ }
+ } catch (Throwable t) {
+ log.error("Connection close error occurred. ", t);
+ }
+ }
+ }
+ return jmxMetricDataList;
+ }
+
+ private void getMatchingAttributes(LinkedList matchingAttributes, MBeanServerConnection mBeanServerConnection, Set beans,
+ LinkedList configurationList) {
+ for (ObjectName beanName : beans) {
+ MBeanAttributeInfo[] attributeInfos;
+ try {
+ attributeInfos = mBeanServerConnection.getMBeanInfo(beanName).getAttributes();
+ } catch (Exception e) {
+ CollectorException ce = new CollectorException(String.format("Get bean's attributes exception. BeanName: %s. Reason: %s",
+ beanName, e.getCause()), e);
+ log.error("Failed to get bean attributes. BeanName is " + beanName, ce);
+ continue;
+ }
+
+ for (MBeanAttributeInfo attributeInfo: attributeInfos) {
+ JMXAttribute jmxAttribute;
+ String attributeType = attributeInfo.getType();
+ if (SIMPLE_TYPES.contains(attributeType)) {
+ log.debug(beanName + " : " + attributeInfo + " has attributeInfo simple type");
+ jmxAttribute = new JMXSimpleAttribute(attributeInfo, beanName, mBeanServerConnection);
+ } else if (COMPOSED_TYPES.contains(attributeType)) {
+ log.debug(beanName + " : " + attributeInfo + " has attributeInfo composite type");
+ jmxAttribute = new JMXComplexAttribute(attributeInfo, beanName, mBeanServerConnection);
+ } else if (MULTI_TYPES.contains(attributeType)) {
+ log.debug(beanName + " : " + attributeInfo + " has attributeInfo tabular type");
+ jmxAttribute = new JMXTabularAttribute(attributeInfo, beanName, mBeanServerConnection);
+ } else {
+ //try {
+ log.debug(beanName + " : " + attributeInfo + " has an unsupported type: " + attributeType);
+ //} catch (NullPointerException e) {
+ // log.error("Caught unexpected NullPointerException");
+ //}
+ continue;
+ }
+ for (JMXConfiguration conf: configurationList) {
+ if (jmxAttribute.match(conf)) {
+ jmxAttribute.setMatchingConf(conf);
+ matchingAttributes.add(jmxAttribute);
+ log.debug(" Matching Attribute: " + jmxAttribute.getAttributeName() +
+ ", BeanName:" + beanName.getCanonicalName());
+ }
+ }
+ }
+ }
+ }
+
+ private Map getAttributeInfoByObjectName(MBeanServerConnection mBeanServerConnection,
+ ObjectName objectName) {
+ Map attributeInfoMap = new HashMap<>();
+ try {
+ MBeanInfo mbeanInfo = mBeanServerConnection.getMBeanInfo(objectName);
+ MBeanAttributeInfo[] mBeanAttributeInfoList = mbeanInfo.getAttributes();
+ log.debug("objectName:" + objectName.toString());
+ for (MBeanAttributeInfo info : mBeanAttributeInfoList) {
+ String attributeName = info.getName();
+ String attributeValue = "";
+ try {
+ attributeValue = mBeanServerConnection.getAttribute(objectName, info.getName()).toString();
+ } catch (Exception e) {
+ attributeValue = "Unavailable";
+ log.info("Exception occured when collect ObjectName:" + objectName + ", AttributeName:" + attributeName, e);
+ }
+ attributeInfoMap.put(attributeName, attributeValue);
+ }
+ } catch (Exception e) {
+ attributeInfoMap.put("collected", "false");
+ log.info("Exception occured when collect ObjectName:" + objectName, e);
+ }
+ return attributeInfoMap;
+ }
+
+ public LinkedList> getMetrics(LinkedList matchingAttributes) throws IOException {
+ LinkedList> metrics = new LinkedList>();
+ Iterator it = matchingAttributes.iterator();
+
+ while (it.hasNext()) {
+ JMXAttribute jmxAttr = it.next();
+ try {
+ LinkedList> jmxAttrMetrics = jmxAttr.getMetrics();
+ for (HashMap m : jmxAttrMetrics) {
+ //m.put("check_name", this.checkName);
+ metrics.add(m);
+ JSONObject metricJson = new JSONObject(m);
+ }
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ log.debug("Cannot get metrics for attribute: " + jmxAttr, e);
+ }
+ }
+
+ return metrics;
+ }
+
+ public HashMap listJMXFilterTemplate(String filterKey) {
+ HashMap filterTemplateMap = new HashMap<>();
+ HashMap yamlHash;
+ String projectRootPath = "";
+ try {
+ File jmxFilterDir = new File(JMXConfig.JMX_FILTER_DIR);
+ if (!jmxFilterDir.exists() || !jmxFilterDir.isDirectory()) {
+ throw new IOException();
+ }
+ for (File yamlFile:jmxFilterDir.listFiles()) {
+ String fileFullName = yamlFile.getName();
+ log.info("Found JMXFilterTemplate filename=" + fileFullName);
+ if (matchIgnoreCase(filterKey, fileFullName)) {
+ String[] fileNames = fileFullName.split("\\.");
+ yamlHash = CommonUtils.yamlParse(yamlFile);
+ filterTemplateMap.put(fileNames[0], yamlHash);
+ }
+ }
+ } catch (IOException e) {
+ CollectorException ce = new CollectorException(String.format("%s occurred. Reason:%s. Advice:"+
+ "Create a directory named JMXFilterTemplate to include filter templates in the project root path:%s.",
+ e.getClass().getCanonicalName(), e.getLocalizedMessage(), projectRootPath), e);
+ log.error("JMXFilterTemplate path does not exist.");
+ filterTemplateMap.put("error", ce.getLocalizedMessage());
+ }
+
+ return filterTemplateMap;
+ }
+
+ boolean matchIgnoreCase(String regex, String string) {
+ Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
+ Matcher matcher = pattern.matcher(string);
+
+ boolean match = matcher.find();
+
+ return match;
+ }
+}
diff --git a/src/main/java/org/gnuhpc/bigdata/service/KafkaAdminService.java b/src/main/java/org/gnuhpc/bigdata/service/KafkaAdminService.java
index fc171a7..8fd363a 100644
--- a/src/main/java/org/gnuhpc/bigdata/service/KafkaAdminService.java
+++ b/src/main/java/org/gnuhpc/bigdata/service/KafkaAdminService.java
@@ -1,11 +1,38 @@
package org.gnuhpc.bigdata.service;
+import static java.lang.String.format;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonDeserializer;
-import kafka.admin.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.PostConstruct;
+import kafka.admin.AdminClient;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.admin.ReassignPartitionsCommand;
+import kafka.admin.ReassignmentStatus;
+import kafka.admin.TopicCommand;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.OffsetAndMetadata;
@@ -22,24 +49,34 @@
import lombok.extern.log4j.Log4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.requests.MetadataResponse;
-import org.apache.kafka.common.serialization.StringDeserializer;
import org.gnuhpc.bigdata.CollectionConvertor;
import org.gnuhpc.bigdata.componet.OffsetStorage;
-import org.gnuhpc.bigdata.constant.ConsumerState;
+import org.gnuhpc.bigdata.config.KafkaConfig;
import org.gnuhpc.bigdata.constant.ConsumerType;
import org.gnuhpc.bigdata.constant.GeneralResponseState;
-import org.gnuhpc.bigdata.model.*;
+import org.gnuhpc.bigdata.model.AddPartition;
+import org.gnuhpc.bigdata.model.BrokerInfo;
+import org.gnuhpc.bigdata.model.ConsumerGroupDesc;
+import org.gnuhpc.bigdata.model.ConsumerGroupDescFactory;
+import org.gnuhpc.bigdata.model.GeneralResponse;
+import org.gnuhpc.bigdata.model.HealthCheckResult;
+import org.gnuhpc.bigdata.model.ReassignWrapper;
+import org.gnuhpc.bigdata.model.TopicBrief;
+import org.gnuhpc.bigdata.model.TopicDetail;
+import org.gnuhpc.bigdata.model.TopicMeta;
+import org.gnuhpc.bigdata.model.TopicPartitionInfo;
import org.gnuhpc.bigdata.task.FetchOffSetFromZKResult;
import org.gnuhpc.bigdata.task.FetchOffsetFromZKTask;
import org.gnuhpc.bigdata.utils.KafkaUtils;
@@ -55,995 +92,1219 @@
import scala.collection.JavaConverters;
import scala.collection.Seq;
-import javax.annotation.PostConstruct;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static java.lang.String.format;
-import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toSet;
-
-/**
- * Created by gnuhpc on 2017/7/17.
- */
-
+/** Created by gnuhpc on 2017/7/17. */
@Service
@Log4j
@Validated
public class KafkaAdminService {
- private static final int channelSocketTimeoutMs = 600;
- private static final int channelRetryBackoffMs = 600;
- private static final String CONSUMERPATHPREFIX = "/consumers/";
- private static final String OFFSETSPATHPREFIX = "/offsets/";
- @Autowired
- private ZookeeperUtils zookeeperUtils;
+ private static final int channelSocketTimeoutMs = 600;
+ private static final int channelRetryBackoffMs = 600;
+ private static final String CONSUMERPATHPREFIX = "/consumers/";
+ private static final String OFFSETSPATHPREFIX = "/offsets/";
+ @Autowired private ZookeeperUtils zookeeperUtils;
- @Autowired
- private KafkaUtils kafkaUtils;
+ @Autowired private KafkaUtils kafkaUtils;
- @Autowired
- private OffsetStorage storage;
+ @Autowired private KafkaConfig kafkaConfig;
- //For AdminUtils use
- private ZkUtils zkUtils;
+ @Autowired private OffsetStorage storage;
- //For zookeeper connection
- private CuratorFramework zkClient;
+ // For AdminUtils use
+ private ZkUtils zkUtils;
- //For Json serialized
- private Gson gson;
+ // For zookeeper connection
+ private CuratorFramework zkClient;
- private scala.Option NONE = scala.Option.apply(null);
+ // For Json serialized
+ private Gson gson;
- @PostConstruct
- private void init() {
- this.zkUtils = zookeeperUtils.getZkUtils();
- this.zkClient = zookeeperUtils.getCuratorClient();
- GsonBuilder builder = new GsonBuilder();
- builder.registerTypeAdapter(DateTime.class, (JsonDeserializer) (jsonElement, type, jsonDeserializationContext) -> new DateTime(jsonElement.getAsJsonPrimitive().getAsLong()));
+ private scala.Option NONE = scala.Option.apply(null);
- this.gson = builder.create();
- }
-
- public TopicMeta createTopic(TopicDetail topic, String reassignStr) {
- if (StringUtils.isEmpty(topic.getName())) {
- throw new InvalidTopicException("Empty topic name");
- }
-
- if (Topic.hasCollisionChars(topic.getName())) {
- throw new InvalidTopicException("Invalid topic name");
- }
-
- if (Strings.isNullOrEmpty(reassignStr) && topic.getPartitions() <= 0) {
- throw new InvalidTopicException("Number of partitions must be larger than 0");
- }
- Topic.validate(topic.getName());
-
-
- if (Strings.isNullOrEmpty(reassignStr)) {
- AdminUtils.createTopic(zkUtils,
- topic.getName(), topic.getPartitions(), topic.getFactor(),
- topic.getProp(), RackAwareMode.Enforced$.MODULE$);
- } else {
- List argsList = new ArrayList<>();
- argsList.add("--topic");
- argsList.add(topic.getName());
-
- if (topic.getProp().stringPropertyNames().size() != 0) {
- argsList.add("--config");
-
- for (String key : topic.getProp().stringPropertyNames()) {
- argsList.add(key + "=" + topic.getProp().get(key));
- }
- }
- argsList.add("--replica-assignment");
- argsList.add(reassignStr);
-
- TopicCommand.createTopic(zkUtils, new TopicCommand.TopicCommandOptions(argsList.stream().toArray(String[]::new)));
- }
-
-
- try {
- //Wait for a second for metadata propergating
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return describeTopic(topic.getName());
- }
-
- public List listTopics() {
- return CollectionConvertor.seqConvertJavaList(zkUtils.getAllTopics());
- }
-
- public List listTopicBrief() {
- KafkaConsumer consumer = kafkaUtils.createNewConsumer();
- Map> topicMap = consumer.listTopics();
- List result = topicMap.entrySet().parallelStream().map(e -> {
- String topic = e.getKey();
- long replicateCount = e.getValue().parallelStream().flatMap(pi -> Arrays.stream(pi.replicas())).count();
- long isrCount = e.getValue().parallelStream().flatMap(pi -> Arrays.stream(pi.inSyncReplicas())).count();
- if (replicateCount == 0) {
- return new TopicBrief(topic, e.getValue().size(), 0);
- } else {
- return new TopicBrief(topic, e.getValue().size(), ((double) isrCount / replicateCount));
- }
- }
- ).collect(toList());
-
- consumer.close();
-
- return result;
- }
+ @PostConstruct
+ private void init() {
+ this.zkUtils = zookeeperUtils.getZkUtils();
+ this.zkClient = zookeeperUtils.getCuratorClient();
+ GsonBuilder builder = new GsonBuilder();
+ builder.registerTypeAdapter(
+ DateTime.class,
+ (JsonDeserializer)
+ (jsonElement, type, jsonDeserializationContext) ->
+ new DateTime(jsonElement.getAsJsonPrimitive().getAsLong()));
- public boolean existTopic(String topicName) {
- return AdminUtils.topicExists(zkUtils, topicName);
- }
+ this.gson = builder.create();
+ }
- public List listBrokers() {
- List