java – 并发配对
|
我正在寻找一种 java并发习惯用法来匹配具有最高吞吐量的大量元素. 考虑一下我有“人”来自多个线程.每个“人”都在寻找一场比赛.当它找到另一个等待的“人”时,它们相互匹配并被移除以进行处理. 我不想锁定一个大的结构来改变状态.考虑Person有getMatch和setMatch.在提交之前,每个人的#getMatch都是null.但是当他们解锁(或被捕获)时,他们要么已经过期,因为他们等待长时间的比赛或#getMatch是非空的. 保持高通过率的一些问题是,如果PersonA与PersonB同时提交.它们相互匹配,但PersonB也匹配已经在等待的PersonC. PersonB的状态在提交时变为“可用”.但是当PersonB与PersonC匹配时,PersonA不需要偶然获得PersonB.合理?另外,我想以异步方式执行此操作.换句话说,我不希望每个提交者都必须在具有waitForMatch类型的东西的线程上持有Person. 同样,我不希望请求必须在不同的线程上运行,但是如果有一个额外的匹配器线程也没关系. 似乎应该有一些成语,因为它似乎是一个非常普遍的事情.但我的谷歌搜索已经枯竭(我可能使用错误的条款). UPDATE 有几件事让我很难解决这个问题.一个是我不想在内存中有对象,我想让所有等待的候选人都使用redis或memcache或类似的东西.另一个是任何人可能有几个可能的比赛.考虑如下界面: person.getId(); // lets call this an Integer person.getFriendIds(); // a collection of other person ids 然后我有一个看起来像这样的服务器: MatchServer: submit( personId,expiration ) -> void // non-blocking returns immediately isDone( personId ) -> boolean // either expired or found a match getMatch( personId ) -> matchId // also non-blocking 这是一个休息界面,它会使用重定向,直到你得到结果.我的第一个想法是在MatchServer中有一个Cache,它由redis之类的东西支持,并且对于当前被锁定和被操作的对象具有并发的弱值哈希映射.每个personId将由持久状态对象包装,状态为已提交,匹配和过期. 到目前为止?非常简单,提交代码完成了初始工作,它是这样的: public void submit( Person p,long expiration ) {
MatchStatus incoming = new MatchStatus( p.getId(),expiration );
if ( !tryMatch( incoming,p.getFriendIds() ) )
cache.put( p.getId(),incoming );
}
public boolean isDone( Integer personId ) {
MatchStatus status = cache.get( personId );
status.lock();
try {
return status.isMatched() || status.isExpired();
} finally {
status.unlock();
}
}
public boolean tryMatch( MatchStatus incoming,Iterable<Integer> friends ) {
for ( Integer friend : friends ) {
if ( match( incoming,friend ) )
return true;
}
return false;
}
private boolean match( MatchStatus incoming,Integer waitingId ) {
CallStatus waiting = cache.get( waitingId );
if ( waiting == null )
return false;
waiting.lock();
try {
if ( waiting.isMatched() )
return false;
waiting.setMatch( incoming.getId() );
incoming.setMatch( waiting.getId() );
return true
} finally {
waiting.unlock();
}
}
所以这里的问题是,如果两个人同时进来并且他们是他们唯一的比赛,他们就不会找到对方.竞争条件对吗?我能看到解决它的唯一方法是同步“tryMatch()”.但这会影响我的吞吐量.我不能无限期地循环tryMatch,因为我需要这些非常短的调用. 那么有什么更好的方法来解决这个问题呢?我提出的每一个解决方案都会一次一个地强迫人们使用吞吐量.例如,创建后台线程并使用阻塞队列一次放入和接收传入线程. 任何指导将不胜感激. 解决方法您可以使用ConcurrentHashMap.我假设你的对象有他们可以匹配的密钥,例如PersonA和PersonB将具有“Person”键.ConcurrentHashMap<String,Match> map = new ConcurrentHashMap<>();
void addMatch(Match match) {
boolean success = false;
while(!success) {
Match oldMatch = map.remove(match.key);
if(oldMatch != null) {
match.setMatch(oldMatch);
success = true;
} else if(map.putIfAbsent(match.key,match) == null) {
success = true;
}
}
}
您将继续循环,直到您将匹配添加到地图,或者直到您删除现有匹配并将其配对为止. remove和putIfAbsent都是原子的. 编辑:因为您要将数据卸载到磁盘,您可以使用例如MongoDB到此,用其findAndModify方法.如果具有密钥的对象已经存在,则该命令将删除并返回它,以便您可以将旧对象与新对象配对,并且可能存储与新密钥关联的对;如果具有该键的对象不存在,则该命令将该对象与该键一起存储.这相当于ConcurrentHashMap的行为,除了数据存储在磁盘而不是内存中;您不必担心同时写入两个对象,因为findAndModify逻辑可以防止它们无意中占用相同的密钥. 如果需要将对象序列化为JSON,请使用Jackson. 还有Mongo的替代品,例如DynamoDB,虽然Dynamo只免费提供少量数据. 编辑:鉴于朋友列表不是自反性的,我认为你可以通过组合MongoDB(或其他键值数据库与原子更新)和ConcurrentHashMap来解决这个问题. > MongoDB中的人员“匹配”或“无与伦比”. (如果我说“从MongoDB中删除一个人”,我的意思是“将该人的状态设置为’匹配’.”) 在一般情况下,一个人要么与朋友匹配,要么在没有朋友加入系统的同时迭代通过朋友列表(即该人的ConcurrentHashMap将为空)时不匹配.如果同时写朋友: Friend1和Friend2同时添加. > Friend1写信给Friend2的ConcurrentHashMap,表示他们错过了对方. 一些打嗝: > Friend1和Friend2可能都没有将ConcurrentHashMaps与它们相关联,例如:如果Friend2在Friend1检查以查看地图是否在内存中时仍在初始化其哈希映射.这很好,因为Friend2将写入Friend1的哈希映射,因此我们保证最终会尝试匹配 – 至少其中一个将具有哈希映射而另一个迭代,因为哈希映射创建在迭代之前. 编辑:一些代码: addUnmatchedToMongo(person1)方法将一个“不匹配”的person1写入MongoDB setToMatched(friend1)使用findAndModify将friend1原子设置为“matched”;如果friend1已匹配或不存在,则该方法将返回false;如果更新成功,则返回true 如果friend1存在且匹配,则isMatched(friend1)返回true,如果它不存在或存在且“不匹配”,则返回false private ConcurrentHashMap<String,ConcurrentHashMap<String,Person>> globalMap;
private DelayQueue<DelayedRetry> delayQueue;
private ThreadPoolExecutor executor;
executor.execute(new Runnable() {
public void run() {
while(true) {
Runnable runnable = delayQueue.take();
executor.execute(runnable);
}
}
}
public static void findMatch(Person person,Collection<Person> friends) {
findMatch(person,friends,1);
}
public static void findMatch(Person person,Collection<Person> friends,int delayMultiplier) {
globalMap.put(person.id,new ConcurrentHashMap<String,Person>());
for(Person friend : friends) {
if(**setToMatched(friend)**) {
// write person to MongoDB in "matched" state
// write "Pair(person,friend)" to MongoDB so it can be queried by the end user
globalMap.remove(person.id);
return;
} else {
if(**!isMatched(friend)** && globalMap.get(person.id).get(friend.id) == null) {
// the existence of "friendMap" indicates another thread is currently trying to match the friend
ConcurrentHashMap<String,Person> friendMap = globalMap.get(friend.id);
if(friendMap != null) {
friendMap.put(person.id,person);
}
}
}
}
**addUnmatchedToMongo(person)**;
Collection<Person> retryFriends = globalMap.remove(person.id).values();
if(retryFriends.size() > 0) {
delayQueue.add(new DelayedRetry(500 * new Random().nextInt(30 * delayMultiplier),person,retryFriends,delayMultiplier));
}
}
public class DelayedRetry implements Runnable,Delayed {
private final long delay;
private final Person person;
private final Collection<Person> friends;
private final int delayMultiplier;
public DelayedRetry(long delay,Person person,delayMultiplier) {
this.delay = delay;
this.person = person;
this.friends = friends;
this.delayMultiplier = delayMultiplier;
}
public long getDelay(TimeUnit unit) {
return unit.convert(delay,TimeUnit.MILLISECONDS);
}
public void run {
findMatch(person,delayMultiplier + 1);
}
} (编辑:安卓应用网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
