package com.xxl.job.core.executor; import com.xxl.job.core.biz.AdminBiz; import com.xxl.job.core.biz.ExecutorBiz; import com.xxl.job.core.biz.impl.ExecutorBizImpl; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.log.XxlJobFileAppender; import com.xxl.job.core.thread.ExecutorRegistryThread; import com.xxl.job.core.thread.JobLogFileCleanThread; import com.xxl.job.core.thread.JobThread; import com.xxl.job.core.thread.TriggerCallbackThread; import com.xxl.rpc.registry.ServiceRegistry; import com.xxl.rpc.remoting.invoker.XxlRpcInvokerFactory; import com.xxl.rpc.remoting.invoker.call.CallType; import com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean; import com.xxl.rpc.remoting.invoker.route.LoadBalance; import com.xxl.rpc.remoting.net.NetEnum; import com.xxl.rpc.remoting.provider.XxlRpcProviderFactory; import com.xxl.rpc.serialize.Serializer; import com.xxl.rpc.util.IpUtil; import com.xxl.rpc.util.NetUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** * Created by xuxueli on 2016/3/2 21:14. */ public class XxlJobExecutor { private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class); // ---------------------- param ---------------------- private String adminAddresses; private String appName; private String ip; private int port; private String accessToken; private String logPath; private int logRetentionDays; public void setAdminAddresses(String adminAddresses) { this.adminAddresses = adminAddresses; } public void setAppName(String appName) { this.appName = appName; } public void setIp(String ip) { this.ip = ip; } public void setPort(int port) { this.port = port; } public void setAccessToken(String accessToken) { this.accessToken = accessToken; } public void setLogPath(String logPath) { this.logPath = logPath; } public void setLogRetentionDays(int logRetentionDays) { this.logRetentionDays = logRetentionDays; } // ---------------------- start + stop ---------------------- public void start() throws Exception { // init logpath XxlJobFileAppender.initLogPath(logPath); // init admin-client initAdminBizList(adminAddresses, accessToken); // init JobLogFileCleanThread JobLogFileCleanThread.getInstance().start(logRetentionDays); // init TriggerCallbackThread TriggerCallbackThread.getInstance().start(); // init executor-server port = port>0?port: NetUtil.findAvailablePort(9777); ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp(); initRpcProvider(ip, port, appName, accessToken); } public void destroy(){ // destory jobThreadRepository if (jobThreadRepository.size() > 0) { for (Map.Entry item: jobThreadRepository.entrySet()) { removeJobThread(item.getKey(), "web container destroy and kill the job."); } jobThreadRepository.clear(); } // destory JobLogFileCleanThread JobLogFileCleanThread.getInstance().toStop(); // destory TriggerCallbackThread TriggerCallbackThread.getInstance().toStop(); // destory executor-server stopRpcProvider(); } // ---------------------- admin-client (rpc invoker) ---------------------- private static List adminBizList; private void initAdminBizList(String adminAddresses, String accessToken) throws Exception { if (adminAddresses!=null && adminAddresses.trim().length()>0) { for (String address: adminAddresses.trim().split(",")) { if (address!=null && address.trim().length()>0) { String addressUrl = address.concat(AdminBiz.MAPPING); AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean( NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), CallType.SYNC, LoadBalance.ROUND, AdminBiz.class, null, 10000, addressUrl, accessToken, null, null ).getObject(); if (adminBizList == null) { adminBizList = new ArrayList(); } adminBizList.add(adminBiz); } } } } public static List getAdminBizList(){ return adminBizList; } // ---------------------- executor-server (rpc provider) ---------------------- private XxlRpcProviderFactory xxlRpcProviderFactory = null; private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception { // init, provider factory String address = IpUtil.getIpPort(ip, port); Map serviceRegistryParam = new HashMap(); serviceRegistryParam.put("appName", appName); serviceRegistryParam.put("address", address); xxlRpcProviderFactory = new XxlRpcProviderFactory(); xxlRpcProviderFactory.initConfig(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam); // add services xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl()); // start xxlRpcProviderFactory.start(); } public static class ExecutorServiceRegistry extends ServiceRegistry { @Override public void start(Map param) { // start registry ExecutorRegistryThread.getInstance().start(param.get("appName"), param.get("address")); } @Override public void stop() { // stop registry ExecutorRegistryThread.getInstance().toStop(); } @Override public boolean registry(Set keys, String value) { return false; } @Override public boolean remove(Set keys, String value) { return false; } @Override public Map> discovery(Set keys) { return null; } @Override public TreeSet discovery(String key) { return null; } } private void stopRpcProvider() { // stop invoker factory try { XxlRpcInvokerFactory.getInstance().stop(); } catch (Exception e) { logger.error(e.getMessage(), e); } // stop provider factory try { xxlRpcProviderFactory.stop(); } catch (Exception e) { logger.error(e.getMessage(), e); } } // ---------------------- job handler repository ---------------------- private static ConcurrentHashMap jobHandlerRepository = new ConcurrentHashMap(); public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){ logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler); return jobHandlerRepository.put(name, jobHandler); } public static IJobHandler loadJobHandler(String name){ return jobHandlerRepository.get(name); } // ---------------------- job thread repository ---------------------- private static ConcurrentHashMap jobThreadRepository = new ConcurrentHashMap(); public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){ JobThread newJobThread = new JobThread(jobId, handler); newJobThread.start(); logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler}); JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!! if (oldJobThread != null) { oldJobThread.toStop(removeOldReason); oldJobThread.interrupt(); } return newJobThread; } public static void removeJobThread(int jobId, String removeOldReason){ JobThread oldJobThread = jobThreadRepository.remove(jobId); if (oldJobThread != null) { oldJobThread.toStop(removeOldReason); oldJobThread.interrupt(); } } public static JobThread loadJobThread(int jobId){ JobThread jobThread = jobThreadRepository.get(jobId); return jobThread; } }