package com.xxl.job.core.executor;
|
|
import com.xxl.job.core.biz.AdminBiz;
|
import com.xxl.job.core.biz.ExecutorBiz;
|
import com.xxl.job.core.biz.impl.ExecutorBizImpl;
|
import com.xxl.job.core.handler.IJobHandler;
|
import com.xxl.job.core.log.XxlJobFileAppender;
|
import com.xxl.job.core.thread.ExecutorRegistryThread;
|
import com.xxl.job.core.thread.JobLogFileCleanThread;
|
import com.xxl.job.core.thread.JobThread;
|
import com.xxl.job.core.thread.TriggerCallbackThread;
|
import com.xxl.rpc.registry.ServiceRegistry;
|
import com.xxl.rpc.remoting.invoker.XxlRpcInvokerFactory;
|
import com.xxl.rpc.remoting.invoker.call.CallType;
|
import com.xxl.rpc.remoting.invoker.reference.XxlRpcReferenceBean;
|
import com.xxl.rpc.remoting.invoker.route.LoadBalance;
|
import com.xxl.rpc.remoting.net.NetEnum;
|
import com.xxl.rpc.remoting.provider.XxlRpcProviderFactory;
|
import com.xxl.rpc.serialize.Serializer;
|
import com.xxl.rpc.util.IpUtil;
|
import com.xxl.rpc.util.NetUtil;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
|
import java.util.*;
|
import java.util.concurrent.ConcurrentHashMap;
|
|
/**
|
* Created by xuxueli on 2016/3/2 21:14.
|
*/
|
public class XxlJobExecutor {
|
private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class);
|
|
// ---------------------- param ----------------------
|
private String adminAddresses;
|
private String appName;
|
private String ip;
|
private int port;
|
private String accessToken;
|
private String logPath;
|
private int logRetentionDays;
|
|
public void setAdminAddresses(String adminAddresses) {
|
this.adminAddresses = adminAddresses;
|
}
|
public void setAppName(String appName) {
|
this.appName = appName;
|
}
|
public void setIp(String ip) {
|
this.ip = ip;
|
}
|
public void setPort(int port) {
|
this.port = port;
|
}
|
public void setAccessToken(String accessToken) {
|
this.accessToken = accessToken;
|
}
|
public void setLogPath(String logPath) {
|
this.logPath = logPath;
|
}
|
public void setLogRetentionDays(int logRetentionDays) {
|
this.logRetentionDays = logRetentionDays;
|
}
|
|
|
// ---------------------- start + stop ----------------------
|
public void start() throws Exception {
|
|
// init logpath
|
XxlJobFileAppender.initLogPath(logPath);
|
|
// init admin-client
|
initAdminBizList(adminAddresses, accessToken);
|
|
|
// init JobLogFileCleanThread
|
JobLogFileCleanThread.getInstance().start(logRetentionDays);
|
|
// init TriggerCallbackThread
|
TriggerCallbackThread.getInstance().start();
|
|
// init executor-server
|
port = port>0?port: NetUtil.findAvailablePort(9777);
|
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
|
initRpcProvider(ip, port, appName, accessToken);
|
}
|
public void destroy(){
|
// destory jobThreadRepository
|
if (jobThreadRepository.size() > 0) {
|
for (Map.Entry<Integer, JobThread> item: jobThreadRepository.entrySet()) {
|
removeJobThread(item.getKey(), "web container destroy and kill the job.");
|
}
|
jobThreadRepository.clear();
|
}
|
|
|
// destory JobLogFileCleanThread
|
JobLogFileCleanThread.getInstance().toStop();
|
|
// destory TriggerCallbackThread
|
TriggerCallbackThread.getInstance().toStop();
|
|
// destory executor-server
|
stopRpcProvider();
|
}
|
|
|
// ---------------------- admin-client (rpc invoker) ----------------------
|
private static List<AdminBiz> adminBizList;
|
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
|
if (adminAddresses!=null && adminAddresses.trim().length()>0) {
|
for (String address: adminAddresses.trim().split(",")) {
|
if (address!=null && address.trim().length()>0) {
|
|
String addressUrl = address.concat(AdminBiz.MAPPING);
|
|
AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(
|
NetEnum.JETTY,
|
Serializer.SerializeEnum.HESSIAN.getSerializer(),
|
CallType.SYNC,
|
LoadBalance.ROUND,
|
AdminBiz.class,
|
null,
|
10000,
|
addressUrl,
|
accessToken,
|
null,
|
null
|
).getObject();
|
|
if (adminBizList == null) {
|
adminBizList = new ArrayList<AdminBiz>();
|
}
|
adminBizList.add(adminBiz);
|
}
|
}
|
}
|
}
|
public static List<AdminBiz> getAdminBizList(){
|
return adminBizList;
|
}
|
|
|
// ---------------------- executor-server (rpc provider) ----------------------
|
private XxlRpcProviderFactory xxlRpcProviderFactory = null;
|
|
private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {
|
|
// init, provider factory
|
String address = IpUtil.getIpPort(ip, port);
|
Map<String, String> serviceRegistryParam = new HashMap<String, String>();
|
serviceRegistryParam.put("appName", appName);
|
serviceRegistryParam.put("address", address);
|
|
xxlRpcProviderFactory = new XxlRpcProviderFactory();
|
xxlRpcProviderFactory.initConfig(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);
|
|
// add services
|
xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());
|
|
// start
|
xxlRpcProviderFactory.start();
|
|
}
|
|
public static class ExecutorServiceRegistry extends ServiceRegistry {
|
|
@Override
|
public void start(Map<String, String> param) {
|
// start registry
|
ExecutorRegistryThread.getInstance().start(param.get("appName"), param.get("address"));
|
}
|
@Override
|
public void stop() {
|
// stop registry
|
ExecutorRegistryThread.getInstance().toStop();
|
}
|
|
@Override
|
public boolean registry(Set<String> keys, String value) {
|
return false;
|
}
|
@Override
|
public boolean remove(Set<String> keys, String value) {
|
return false;
|
}
|
@Override
|
public Map<String, TreeSet<String>> discovery(Set<String> keys) {
|
return null;
|
}
|
@Override
|
public TreeSet<String> discovery(String key) {
|
return null;
|
}
|
|
}
|
|
private void stopRpcProvider() {
|
// stop invoker factory
|
try {
|
XxlRpcInvokerFactory.getInstance().stop();
|
} catch (Exception e) {
|
logger.error(e.getMessage(), e);
|
}
|
// stop provider factory
|
try {
|
xxlRpcProviderFactory.stop();
|
} catch (Exception e) {
|
logger.error(e.getMessage(), e);
|
}
|
}
|
|
|
// ---------------------- job handler repository ----------------------
|
private static ConcurrentHashMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
|
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
|
logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
|
return jobHandlerRepository.put(name, jobHandler);
|
}
|
public static IJobHandler loadJobHandler(String name){
|
return jobHandlerRepository.get(name);
|
}
|
|
|
// ---------------------- job thread repository ----------------------
|
private static ConcurrentHashMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
|
public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
|
JobThread newJobThread = new JobThread(jobId, handler);
|
newJobThread.start();
|
logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
|
|
JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!!
|
if (oldJobThread != null) {
|
oldJobThread.toStop(removeOldReason);
|
oldJobThread.interrupt();
|
}
|
|
return newJobThread;
|
}
|
public static void removeJobThread(int jobId, String removeOldReason){
|
JobThread oldJobThread = jobThreadRepository.remove(jobId);
|
if (oldJobThread != null) {
|
oldJobThread.toStop(removeOldReason);
|
oldJobThread.interrupt();
|
}
|
}
|
public static JobThread loadJobThread(int jobId){
|
JobThread jobThread = jobThreadRepository.get(jobId);
|
return jobThread;
|
}
|
|
}
|