From 3a74f6124a7b82ca210acafa5888aaf0a042255c Mon Sep 17 00:00:00 2001 From: agapple Date: Thu, 22 Aug 2019 23:48:19 +0800 Subject: [PATCH] fixed issue #1940 #2088 , add canal.register.ip support docker --- .../otter/canal/deployer/CanalConstants.java | 1 + .../otter/canal/deployer/CanalController.java | 163 +++++++++--------- .../otter/canal/deployer/CanalLauncher.java | 15 +- deployer/src/main/resources/canal.properties | 3 + 4 files changed, 99 insertions(+), 83 deletions(-) diff --git a/deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java b/deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java index 855499bf60..847fb3679a 100644 --- a/deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java +++ b/deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java @@ -14,6 +14,7 @@ public class CanalConstants { public static final String ROOT = "canal"; public static final String CANAL_ID = ROOT + "." + "id"; public static final String CANAL_IP = ROOT + "." + "ip"; + public static final String CANAL_REGISTER_IP = ROOT + "." + "register.ip"; public static final String CANAL_PORT = ROOT + "." + "port"; public static final String CANAL_METRICS_PULL_PORT = ROOT + "." + "metrics.pull.port"; public static final String CANAL_ADMIN_JMX_PORT = ROOT + "." + "admin.jmx.port"; diff --git a/deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java b/deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java index 6671287d95..0e35d2cea5 100644 --- a/deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java +++ b/deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java @@ -54,6 +54,7 @@ public class CanalController { private static final Logger logger = LoggerFactory.getLogger(CanalController.class); private Long cid; private String ip; + private String registerIp; private int port; // 默认使用spring的方式载入 private Map instanceConfigs; @@ -108,6 +109,7 @@ public CanalConfigClient apply(String managerAddress) { // 准备canal server cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID)); ip = getProperty(properties, CanalConstants.CANAL_IP); + registerIp = getProperty(properties, CanalConstants.CANAL_REGISTER_IP); port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT)); embededCanalServer = CanalServerWithEmbedded.instance(); embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator @@ -127,9 +129,17 @@ public CanalConfigClient apply(String managerAddress) { } // 处理下ip为空,默认使用hostIp暴露到zk中 + if (StringUtils.isEmpty(ip) && StringUtils.isEmpty(registerIp)) { + ip = registerIp = AddressUtils.getHostIp(); + } + if (StringUtils.isEmpty(ip)) { ip = AddressUtils.getHostIp(); } + + if (StringUtils.isEmpty(registerIp)) { + registerIp = ip; // 兼容以前配置 + } final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS); if (StringUtils.isNotEmpty(zkServers)) { zkclientx = ZkClientx.getZkClient(zkServers); @@ -138,89 +148,88 @@ public CanalConfigClient apply(String managerAddress) { zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true); } - final ServerRunningData serverData = new ServerRunningData(cid, ip + ":" + port); + final ServerRunningData serverData = new ServerRunningData(cid, registerIp + ":" + port); ServerRunningMonitors.setServerData(serverData); - ServerRunningMonitors - .setRunningMonitors(MigrateMap.makeComputingMap(new Function() { - - public ServerRunningMonitor apply(final String destination) { - ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData); - runningMonitor.setDestination(destination); - runningMonitor.setListener(new ServerRunningListener() { - - public void processActiveEnter() { - try { - MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination)); - embededCanalServer.start(destination); - if (canalMQStarter != null) { - canalMQStarter.startDestination(destination); - } - } finally { - MDC.remove(CanalConstants.MDC_DESTINATION); + ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function() { + + public ServerRunningMonitor apply(final String destination) { + ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData); + runningMonitor.setDestination(destination); + runningMonitor.setListener(new ServerRunningListener() { + + public void processActiveEnter() { + try { + MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination)); + embededCanalServer.start(destination); + if (canalMQStarter != null) { + canalMQStarter.startDestination(destination); } + } finally { + MDC.remove(CanalConstants.MDC_DESTINATION); } + } - public void processActiveExit() { - try { - MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination)); - if (canalMQStarter != null) { - canalMQStarter.stopDestination(destination); - } - embededCanalServer.stop(destination); - } finally { - MDC.remove(CanalConstants.MDC_DESTINATION); + public void processActiveExit() { + try { + MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination)); + if (canalMQStarter != null) { + canalMQStarter.stopDestination(destination); } + embededCanalServer.stop(destination); + } finally { + MDC.remove(CanalConstants.MDC_DESTINATION); } + } - public void processStart() { - try { - if (zkclientx != null) { - final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, - ip + ":" + port); - initCid(path); - zkclientx.subscribeStateChanges(new IZkStateListener() { - - public void handleStateChanged(KeeperState state) throws Exception { - - } - - public void handleNewSession() throws Exception { - initCid(path); - } - - @Override - public void handleSessionEstablishmentError(Throwable error) throws Exception { - logger.error("failed to connect to zookeeper", error); - } - }); - } - } finally { - MDC.remove(CanalConstants.MDC_DESTINATION); + public void processStart() { + try { + if (zkclientx != null) { + final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, + registerIp + ":" + port); + initCid(path); + zkclientx.subscribeStateChanges(new IZkStateListener() { + + public void handleStateChanged(KeeperState state) throws Exception { + + } + + public void handleNewSession() throws Exception { + initCid(path); + } + + @Override + public void handleSessionEstablishmentError(Throwable error) throws Exception { + logger.error("failed to connect to zookeeper", error); + } + }); } + } finally { + MDC.remove(CanalConstants.MDC_DESTINATION); } + } - public void processStop() { - try { - MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination)); - if (zkclientx != null) { - final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, - ip + ":" + port); - releaseCid(path); - } - } finally { - MDC.remove(CanalConstants.MDC_DESTINATION); + public void processStop() { + try { + MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination)); + if (zkclientx != null) { + final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, + registerIp + ":" + port); + releaseCid(path); } + } finally { + MDC.remove(CanalConstants.MDC_DESTINATION); } - - }); - if (zkclientx != null) { - runningMonitor.setZkClient(zkclientx); } - // 触发创建一下cid节点 - runningMonitor.init(); - return runningMonitor; + + }); + if (zkclientx != null) { + runningMonitor.setZkClient(zkclientx); } - })); + // 触发创建一下cid节点 + runningMonitor.init(); + return runningMonitor; + } + })); // 初始化monitor机制 autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN)); @@ -266,8 +275,7 @@ public void reload(String destination) { instanceConfigMonitors = MigrateMap.makeComputingMap(new Function() { public InstanceConfigMonitor apply(InstanceMode mode) { - int scanInterval = Integer - .valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL)); + int scanInterval = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL)); if (mode.isSpring()) { SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor(); @@ -375,8 +383,7 @@ private void initInstanceConfig(Properties properties) { InstanceConfig oldConfig = instanceConfigs.put(destination, config); if (oldConfig != null) { - logger - .warn("destination:{} old config:{} has replace by new config:{}", destination, oldConfig, config); + logger.warn("destination:{} old config:{} has replace by new config:{}", destination, oldConfig, config); } } } @@ -424,9 +431,9 @@ public static String getProperty(Properties properties, String key) { } public void start() throws Throwable { - logger.info("## start the canal server[{}:{}]", ip, port); + logger.info("## start the canal server[{}({}):{}]", ip, registerIp, port); // 创建整个canal的工作节点 - final String path = ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port); + final String path = ZookeeperPathUtils.getCanalClusterNode(registerIp + ":" + port); initCid(path); if (zkclientx != null) { this.zkclientx.subscribeStateChanges(new IZkStateListener() { @@ -501,14 +508,14 @@ public void stop() throws Throwable { } // 释放canal的工作节点 - releaseCid(ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port)); - logger.info("## stop the canal server[{}:{}]", ip, port); + releaseCid(ZookeeperPathUtils.getCanalClusterNode(registerIp + ":" + port)); + logger.info("## stop the canal server[{}({}):{}]", ip, registerIp, port); if (zkclientx != null) { zkclientx.close(); } - //关闭时清理缓存 + // 关闭时清理缓存 if (instanceConfigs != null) { instanceConfigs.clear(); } diff --git a/deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java b/deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java index 5da5d28bab..cba2758080 100644 --- a/deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java +++ b/deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java @@ -4,16 +4,16 @@ import java.util.Properties; import java.util.concurrent.CountDownLatch; -import com.alibaba.otter.canal.deployer.mbean.CanalServerAgent; -import com.alibaba.otter.canal.deployer.mbean.CanalServerMXBean; -import com.alibaba.otter.canal.deployer.mbean.CanalServerBean; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.alibaba.otter.canal.deployer.mbean.CanalServerAgent; +import com.alibaba.otter.canal.deployer.mbean.CanalServerBean; +import com.alibaba.otter.canal.deployer.mbean.CanalServerMXBean; +import com.alibaba.otter.canal.deployer.monitor.remote.RemoteCanalConfigMonitor; import com.alibaba.otter.canal.deployer.monitor.remote.RemoteConfigLoader; import com.alibaba.otter.canal.deployer.monitor.remote.RemoteConfigLoaderFactory; -import com.alibaba.otter.canal.deployer.monitor.remote.RemoteCanalConfigMonitor; /** * canal独立版本启动的入口类 @@ -80,8 +80,13 @@ public void onChange(Properties properties) { String jmxPort = properties.getProperty(CanalConstants.CANAL_ADMIN_JMX_PORT); if (StringUtils.isNotEmpty(jmxPort)) { String ip = properties.getProperty(CanalConstants.CANAL_IP); + String registerIp = properties.getProperty(CanalConstants.CANAL_REGISTER_IP); + if (StringUtils.isEmpty(registerIp)) { + // 兼容老的配置 + registerIp = ip; + } CanalServerMXBean canalServerMBean = new CanalServerBean(canalStater); - canalServerAgent = new CanalServerAgent(ip, Integer.parseInt(jmxPort), canalServerMBean); + canalServerAgent = new CanalServerAgent(registerIp, Integer.parseInt(jmxPort), canalServerMBean); Thread agentThread = new Thread(canalServerAgent::start); agentThread.start(); } diff --git a/deployer/src/main/resources/canal.properties b/deployer/src/main/resources/canal.properties index dacac6930e..dfc90da364 100644 --- a/deployer/src/main/resources/canal.properties +++ b/deployer/src/main/resources/canal.properties @@ -5,7 +5,10 @@ #canal.manager.jdbc.username=root #canal.manager.jdbc.password=121212 canal.id = 1 +# tcp bind ip canal.ip = +# register ip to zookeeper +canal.register.ip = canal.port = 11111 canal.metrics.pull.port = 11112 canal.admin.jmx.port = 11113