原创

aop联合反射实现mysql与hbase的读写切换

1、能够做的事情

  • 开关控制mysql和hbase的读写切换,实时生效
  • 内置监控可以观察hbase的运行性能(刚切过去,性能稳定性有待考究)

2、核心的代码

package tech.chenxing.aop;

import tech.chenxing.conf.RgSystemConfig;
import tech.chenxing.util.SpringTool;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.time.Instant;

@Aspect
@Component
@Slf4j
public class MultiDataSourceAspect {

    @Autowired private RgSystemConfig rgSystemConfig;

    /** 定义一个切入点 */
    @Pointcut("@annotation(tech.chenxing.aop.MultiDataSource)")
    private void multiDataSourceAspect() {}

    @Around("multiDataSourceAspect()")
    public Object methodMonitor(ProceedingJoinPoint pjp) throws Throwable {
        // 获取方法签名
        MethodSignature signature = (MethodSignature) pjp.getSignature();
        // 获取切入方法的对象
        Method method = signature.getMethod();
        // 获取方法上的Aop注解
        MultiDataSource annotation = method.getAnnotation(MultiDataSource.class);
        Class invokeClass = annotation.invokeClass();
        boolean readOperate = annotation.isReadOperate();
        boolean hbaseStable = rgSystemConfig.isHbaseStable();
        boolean hbaseDualWrite = rgSystemConfig.isHbaseDualWrite();
        if (readOperate) {
            if (hbaseStable) {
                return proceed(pjp);
            } else {
                return invokeRdsOperate(pjp, method, invokeClass);
            }
        } else {
            if (hbaseDualWrite) {
                proceed(pjp);
                // 拦截的方法参数
                return invokeRdsOperate(pjp, method, invokeClass);
            } else {
                if (hbaseStable) {
                    return proceed(pjp);
                } else {
                    return invokeRdsOperate(pjp, method, invokeClass);
                }
            }
        }
    }

    /**
     * 执行方法并关注耗时
     *
     * @param pjp
     * @return
     */
    private Object proceed(ProceedingJoinPoint pjp) throws Throwable {
        Object result = null;
        try {
            Long alarmThreshold = rgSystemConfig.getAlarmThreshold();
            Instant start = Instant.now();
            result = pjp.proceed();
            Instant end = Instant.now();
            long cost = Duration.between(start, end).toMillis();
            if (alarmThreshold > 0 && cost > alarmThreshold) {
                log.warn("hbase dal operate cost:{} ms", cost);
            }
        } catch (Throwable ex) {
            boolean hbaseStable = rgSystemConfig.isHbaseStable();
            if (hbaseStable) {
                throw ex;
            } else {
                log.warn("hbase dal error:{}", ex.getMessage());
            }
        }
        return result;
    }

    /**
     * 是否操作老的数据库
     *
     * @param pjp
     * @param method
     * @param invokeClass
     * @return
     * @throws NoSuchMethodException
     * @throws InvocationTargetException
     * @throws IllegalAccessException
     */
    private Object invokeRdsOperate(ProceedingJoinPoint pjp, Method method, Class invokeClass)
            throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        Object[] args = pjp.getArgs();
        Class<?>[] methodParamArr = method.getParameterTypes();
        String methodName = method.getName();
        Object clz = SpringTool.getBean(invokeClass);
        Method invokeMethod = clz.getClass().getMethod(methodName, methodParamArr);
        return invokeMethod.invoke(clz, args);
    }
}
正文到此结束
本文目录