diff --git a/leaf-core/src/main/java/com/sankuai/inf/leaf/segment/SegmentIDGenImpl.java b/leaf-core/src/main/java/com/sankuai/inf/leaf/segment/SegmentIDGenImpl.java index 21d13518..283a3065 100644 --- a/leaf-core/src/main/java/com/sankuai/inf/leaf/segment/SegmentIDGenImpl.java +++ b/leaf-core/src/main/java/com/sankuai/inf/leaf/segment/SegmentIDGenImpl.java @@ -200,6 +200,7 @@ public void updateSegmentFromDb(String key, Segment segment) { } public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) { + boolean continueTake = true; while (true) { buffer.rLock().lock(); try { @@ -232,11 +233,21 @@ public void run() { long value = segment.getValue().getAndIncrement(); if (value < segment.getMax()) { return new Result(value, Status.SUCCESS); + }else{ + if (!continueTake) { + logger.error("Both two segments in {} are not ready!", buffer); + return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION); + } } } finally { buffer.rLock().unlock(); } waitAndSleep(buffer); + if (!buffer.isNextReady()) { + continueTake = false; + continue; + } + buffer.wLock().lock(); try { final Segment segment = buffer.getCurrent(); @@ -247,9 +258,6 @@ public void run() { if (buffer.isNextReady()) { buffer.switchPos(); buffer.setNextReady(false); - } else { - logger.error("Both two segments in {} are not ready!", buffer); - return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION); } } finally { buffer.wLock().unlock(); diff --git a/leaf-core/src/test/java/com/sankuai/inf/leaf/segment/IDGenServiceTest.java b/leaf-core/src/test/java/com/sankuai/inf/leaf/segment/IDGenServiceTest.java index 000dfe16..4f02985f 100644 --- a/leaf-core/src/test/java/com/sankuai/inf/leaf/segment/IDGenServiceTest.java +++ b/leaf-core/src/test/java/com/sankuai/inf/leaf/segment/IDGenServiceTest.java @@ -4,17 +4,25 @@ import com.sankuai.inf.leaf.IDGen; import com.sankuai.inf.leaf.common.PropertyFactory; import com.sankuai.inf.leaf.common.Result; +import com.sankuai.inf.leaf.common.Status; import com.sankuai.inf.leaf.segment.dao.IDAllocDao; import com.sankuai.inf.leaf.segment.dao.impl.IDAllocDaoImpl; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.perf4j.StopWatch; +import org.perf4j.slf4j.Slf4JStopWatch; import java.io.IOException; import java.sql.SQLException; -import java.util.Properties; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + public class IDGenServiceTest { + public static final AtomicInteger threadSeq = new AtomicInteger(0); IDGen idGen; DruidDataSource dataSource; @Before @@ -49,4 +57,115 @@ public void after() { dataSource.close(); } + + @Test + public void testConcurrentAcquire() throws Exception{ + int threadNum=200; + //每个线程取号1000次 + int takeNumberTimes = 1000; + /** + * 修改数据库中的步长为10 + */ + int step=10; + ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(); + CountDownLatch waitForRunLatch = new CountDownLatch(1); + CountDownLatch mainThreadLatch = new CountDownLatch(threadNum); + String tag = "leaf-segment-test"; + AtomicInteger failCount = new AtomicInteger(0); + + for (int i = 0; i < threadNum; i++) { + new Thread(new TakeNumberTask(mainThreadLatch, waitForRunLatch, idGen, takeNumberTimes, tag, concurrentHashMap,failCount)).start(); + } + + StopWatch mainThreadWatch = new Slf4JStopWatch(); + waitForRunLatch.countDown(); + mainThreadLatch.await(); + final long elapsedTime = mainThreadWatch.getElapsedTime(); + mainThreadWatch.stop("所有线程取号完毕"); + System.out.println("所有线程取号完毕:总共耗时:" + elapsedTime + " 总共获取失败次数:" + failCount.get() ); + final Map sortedMap = sortMapByValue(concurrentHashMap); + final Iterator> iterator = sortedMap.entrySet().iterator(); + while (iterator.hasNext()) { + final Map.Entry next = iterator.next(); + System.out.println("线程名称:"+ next.getKey()+" 耗时:"+next.getValue()); + } + + } + + public static Map sortMapByValue(Map oriMap) { + if (oriMap == null || oriMap.isEmpty()) { + return null; + } + Map sortedMap = new LinkedHashMap(); + List> entryList = new ArrayList>( + oriMap.entrySet()); + Collections.sort(entryList, new MapValueComparator()); + + Iterator> iter = entryList.iterator(); + Map.Entry tmpEntry = null; + while (iter.hasNext()) { + tmpEntry = iter.next(); + sortedMap.put(tmpEntry.getKey(), tmpEntry.getValue()); + } + return sortedMap; + } + + + + static class TakeNumberTask implements Runnable { + private CountDownLatch waitForRunLatch; + private CountDownLatch mainThreadLatch; + private IDGen idGen; + private int takeNumberTimes; + private String tag; + private ConcurrentHashMap concurrentHashMap; + private AtomicInteger failCount ; + + public TakeNumberTask(CountDownLatch mainThreadLatch,CountDownLatch waitForRunLatch, IDGen idGen,int takeNumerTimes,String tag,ConcurrentHashMap concurrentHashMap,AtomicInteger failCount) { + this.mainThreadLatch = mainThreadLatch; + this.waitForRunLatch = waitForRunLatch; + this.idGen = idGen; + this.takeNumberTimes = takeNumerTimes; + this.tag = tag; + this.concurrentHashMap = concurrentHashMap; + this.failCount = failCount; + } + + @Override + public void run() { + Thread thread = Thread.currentThread(); + String threadName = "UserThread-takeNumber-seq-" + threadSeq.incrementAndGet(); + thread.setName(threadName); + try { + waitForRunLatch.await(); + + StopWatch stopWatch = new Slf4JStopWatch(); + int takeNumFail = 0; + for (int i = 0; i < takeNumberTimes; i++) { + final Result result = idGen.get(tag); + if (result.getStatus().equals(Status.EXCEPTION)) { + takeNumFail++; + } + } + final long elapsedTime = stopWatch.getElapsedTime(); + stopWatch.stop("take number complete"); + concurrentHashMap.put(threadName, elapsedTime); + failCount.addAndGet(takeNumFail); + mainThreadLatch.countDown(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + } + + static class MapValueComparator implements Comparator> { + + @Override + public int compare(Map.Entry me1, Map.Entry me2) { + + return me2.getValue().compareTo(me1.getValue()); + } + } + }