Skip to content

Commit

Permalink
fixed issue #1940 #2088 , add canal.register.ip support docker
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Aug 22, 2019
1 parent bd974e3 commit 3a74f61
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class CanalConstants {
public static final String ROOT = "canal";
public static final String CANAL_ID = ROOT + "." + "id";
public static final String CANAL_IP = ROOT + "." + "ip";
public static final String CANAL_REGISTER_IP = ROOT + "." + "register.ip";
public static final String CANAL_PORT = ROOT + "." + "port";
public static final String CANAL_METRICS_PULL_PORT = ROOT + "." + "metrics.pull.port";
public static final String CANAL_ADMIN_JMX_PORT = ROOT + "." + "admin.jmx.port";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class CanalController {
private static final Logger logger = LoggerFactory.getLogger(CanalController.class);
private Long cid;
private String ip;
private String registerIp;
private int port;
// 默认使用spring的方式载入
private Map<String, InstanceConfig> instanceConfigs;
Expand Down Expand Up @@ -108,6 +109,7 @@ public CanalConfigClient apply(String managerAddress) {
// 准备canal server
cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID));
ip = getProperty(properties, CanalConstants.CANAL_IP);
registerIp = getProperty(properties, CanalConstants.CANAL_REGISTER_IP);
port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
embededCanalServer = CanalServerWithEmbedded.instance();
embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
Expand All @@ -127,9 +129,17 @@ public CanalConfigClient apply(String managerAddress) {
}

// 处理下ip为空,默认使用hostIp暴露到zk中
if (StringUtils.isEmpty(ip) && StringUtils.isEmpty(registerIp)) {
ip = registerIp = AddressUtils.getHostIp();
}

if (StringUtils.isEmpty(ip)) {
ip = AddressUtils.getHostIp();
}

if (StringUtils.isEmpty(registerIp)) {
registerIp = ip; // 兼容以前配置
}
final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);
if (StringUtils.isNotEmpty(zkServers)) {
zkclientx = ZkClientx.getZkClient(zkServers);
Expand All @@ -138,89 +148,88 @@ public CanalConfigClient apply(String managerAddress) {
zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);
}

final ServerRunningData serverData = new ServerRunningData(cid, ip + ":" + port);
final ServerRunningData serverData = new ServerRunningData(cid, registerIp + ":" + port);
ServerRunningMonitors.setServerData(serverData);
ServerRunningMonitors
.setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {

public ServerRunningMonitor apply(final String destination) {
ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
runningMonitor.setDestination(destination);
runningMonitor.setListener(new ServerRunningListener() {

public void processActiveEnter() {
try {
MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
embededCanalServer.start(destination);
if (canalMQStarter != null) {
canalMQStarter.startDestination(destination);
}
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {

public ServerRunningMonitor apply(final String destination) {
ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
runningMonitor.setDestination(destination);
runningMonitor.setListener(new ServerRunningListener() {

public void processActiveEnter() {
try {
MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
embededCanalServer.start(destination);
if (canalMQStarter != null) {
canalMQStarter.startDestination(destination);
}
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
}
}

public void processActiveExit() {
try {
MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
if (canalMQStarter != null) {
canalMQStarter.stopDestination(destination);
}
embededCanalServer.stop(destination);
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
public void processActiveExit() {
try {
MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
if (canalMQStarter != null) {
canalMQStarter.stopDestination(destination);
}
embededCanalServer.stop(destination);
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
}
}

public void processStart() {
try {
if (zkclientx != null) {
final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
ip + ":" + port);
initCid(path);
zkclientx.subscribeStateChanges(new IZkStateListener() {

public void handleStateChanged(KeeperState state) throws Exception {

}

public void handleNewSession() throws Exception {
initCid(path);
}

@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception {
logger.error("failed to connect to zookeeper", error);
}
});
}
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
public void processStart() {
try {
if (zkclientx != null) {
final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
registerIp + ":" + port);
initCid(path);
zkclientx.subscribeStateChanges(new IZkStateListener() {

public void handleStateChanged(KeeperState state) throws Exception {

}

public void handleNewSession() throws Exception {
initCid(path);
}

@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception {
logger.error("failed to connect to zookeeper", error);
}
});
}
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
}
}

public void processStop() {
try {
MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
if (zkclientx != null) {
final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
ip + ":" + port);
releaseCid(path);
}
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
public void processStop() {
try {
MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
if (zkclientx != null) {
final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
registerIp + ":" + port);
releaseCid(path);
}
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
}

});
if (zkclientx != null) {
runningMonitor.setZkClient(zkclientx);
}
// 触发创建一下cid节点
runningMonitor.init();
return runningMonitor;

});
if (zkclientx != null) {
runningMonitor.setZkClient(zkclientx);
}
}));
// 触发创建一下cid节点
runningMonitor.init();
return runningMonitor;
}
}));

// 初始化monitor机制
autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
Expand Down Expand Up @@ -266,8 +275,7 @@ public void reload(String destination) {
instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() {

public InstanceConfigMonitor apply(InstanceMode mode) {
int scanInterval = Integer
.valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
int scanInterval = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL));

if (mode.isSpring()) {
SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();
Expand Down Expand Up @@ -375,8 +383,7 @@ private void initInstanceConfig(Properties properties) {
InstanceConfig oldConfig = instanceConfigs.put(destination, config);

if (oldConfig != null) {
logger
.warn("destination:{} old config:{} has replace by new config:{}", destination, oldConfig, config);
logger.warn("destination:{} old config:{} has replace by new config:{}", destination, oldConfig, config);
}
}
}
Expand Down Expand Up @@ -424,9 +431,9 @@ public static String getProperty(Properties properties, String key) {
}

public void start() throws Throwable {
logger.info("## start the canal server[{}:{}]", ip, port);
logger.info("## start the canal server[{}({}):{}]", ip, registerIp, port);
// 创建整个canal的工作节点
final String path = ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port);
final String path = ZookeeperPathUtils.getCanalClusterNode(registerIp + ":" + port);
initCid(path);
if (zkclientx != null) {
this.zkclientx.subscribeStateChanges(new IZkStateListener() {
Expand Down Expand Up @@ -501,14 +508,14 @@ public void stop() throws Throwable {
}

// 释放canal的工作节点
releaseCid(ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port));
logger.info("## stop the canal server[{}:{}]", ip, port);
releaseCid(ZookeeperPathUtils.getCanalClusterNode(registerIp + ":" + port));
logger.info("## stop the canal server[{}({}):{}]", ip, registerIp, port);

if (zkclientx != null) {
zkclientx.close();
}

//关闭时清理缓存
// 关闭时清理缓存
if (instanceConfigs != null) {
instanceConfigs.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

import com.alibaba.otter.canal.deployer.mbean.CanalServerAgent;
import com.alibaba.otter.canal.deployer.mbean.CanalServerMXBean;
import com.alibaba.otter.canal.deployer.mbean.CanalServerBean;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.otter.canal.deployer.mbean.CanalServerAgent;
import com.alibaba.otter.canal.deployer.mbean.CanalServerBean;
import com.alibaba.otter.canal.deployer.mbean.CanalServerMXBean;
import com.alibaba.otter.canal.deployer.monitor.remote.RemoteCanalConfigMonitor;
import com.alibaba.otter.canal.deployer.monitor.remote.RemoteConfigLoader;
import com.alibaba.otter.canal.deployer.monitor.remote.RemoteConfigLoaderFactory;
import com.alibaba.otter.canal.deployer.monitor.remote.RemoteCanalConfigMonitor;

/**
* canal独立版本启动的入口类
Expand Down Expand Up @@ -80,8 +80,13 @@ public void onChange(Properties properties) {
String jmxPort = properties.getProperty(CanalConstants.CANAL_ADMIN_JMX_PORT);
if (StringUtils.isNotEmpty(jmxPort)) {
String ip = properties.getProperty(CanalConstants.CANAL_IP);
String registerIp = properties.getProperty(CanalConstants.CANAL_REGISTER_IP);
if (StringUtils.isEmpty(registerIp)) {
// 兼容老的配置
registerIp = ip;
}
CanalServerMXBean canalServerMBean = new CanalServerBean(canalStater);
canalServerAgent = new CanalServerAgent(ip, Integer.parseInt(jmxPort), canalServerMBean);
canalServerAgent = new CanalServerAgent(registerIp, Integer.parseInt(jmxPort), canalServerMBean);
Thread agentThread = new Thread(canalServerAgent::start);
agentThread.start();
}
Expand Down
3 changes: 3 additions & 0 deletions deployer/src/main/resources/canal.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
#canal.manager.jdbc.username=root
#canal.manager.jdbc.password=121212
canal.id = 1
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
canal.admin.jmx.port = 11113
Expand Down

0 comments on commit 3a74f61

Please sign in to comment.