Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

Commit

Permalink
Rolling back an unnecessary change where a deadlock may have been int…
Browse files Browse the repository at this point in the history
…roduced.
  • Loading branch information
jongoldDB committed Jan 18, 2019
1 parent 5b0ed90 commit 16d7d83
Showing 1 changed file with 25 additions and 36 deletions.
61 changes: 25 additions & 36 deletions Ambrosia/Ambrosia/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2799,7 +2799,6 @@ private void ProcessRPC(FlexReadBuffer RpcBuffer)
private async Task ToDataStreamAsync(Stream writeToStream,
string destString,
CancellationToken ct)

{
OutputConnectionRecord outputConnectionRecord;
if (destString.Equals(_serviceName))
Expand All @@ -2822,19 +2821,10 @@ private async Task ToDataStreamAsync(Stream writeToStream,
}
try
{
lock (outputConnectionRecord)
{
if (outputConnectionRecord.WillResetConnection)
{
// Register our immediate intent to set the connection. This unblocks output writers which may be important for unlocking the outputConnectionRecord
outputConnectionRecord.ResettingConnection = true;
}

// Reset the output cursor if it exists
outputConnectionRecord.BufferedOutput.AcquireTrimLock(2);
outputConnectionRecord.placeInOutput = new EventBuffer.BuffersCursor(null, -1, 0);
outputConnectionRecord.BufferedOutput.ReleaseTrimLock();
}
// Reset the output cursor if it exists
outputConnectionRecord.BufferedOutput.AcquireTrimLock(2);
outputConnectionRecord.placeInOutput = new EventBuffer.BuffersCursor(null, -1, 0);
outputConnectionRecord.BufferedOutput.ReleaseTrimLock();
// Process replay message
var inputFlexBuffer = new FlexReadBuffer();
await FlexReadBuffer.DeserializeAsync(writeToStream, inputFlexBuffer, ct);
Expand All @@ -2843,40 +2833,45 @@ private async Task ToDataStreamAsync(Stream writeToStream,
var commitSeqNo = StreamCommunicator.ReadBufferedLong(inputFlexBuffer.Buffer, sizeBytes + 1);
var commitSeqNoReplayable = StreamCommunicator.ReadBufferedLong(inputFlexBuffer.Buffer, sizeBytes + 1 + StreamCommunicator.LongSize(commitSeqNo));
inputFlexBuffer.ResetBuffer();
lock (outputConnectionRecord)
if (outputConnectionRecord.ConnectingAfterRestart)
{
if (outputConnectionRecord.ConnectingAfterRestart)
// We've been through recovery (at least partially), and have scrubbed all ephemeral calls. Must now rebase
// seq nos using the markers which were sent by the listener. Must first take locks to ensure no interference
lock (outputConnectionRecord)
{
// We've been through recovery (at least partially), and have scrubbed all ephemeral calls. Must now rebase
// seq nos using the markers which were sent by the listener. Must first take locks to ensure no interference
// Don't think I actually need this lock, but can't hurt and shouldn't affect perf.
outputConnectionRecord.BufferedOutput.AcquireTrimLock(2);
outputConnectionRecord.BufferedOutput.RebaseSeqNosInBuffer(commitSeqNo, commitSeqNoReplayable);
outputConnectionRecord.LastSeqNoFromLocalService += commitSeqNo - commitSeqNoReplayable;
outputConnectionRecord.ConnectingAfterRestart = false;
outputConnectionRecord.BufferedOutput.ReleaseTrimLock();
}
}

// If recovering, make sure event replay will be filtered out
outputConnectionRecord.ReplayFrom = commitSeqNo;
// If recovering, make sure event replay will be filtered out
outputConnectionRecord.ReplayFrom = commitSeqNo;

if (outputConnectionRecord.WillResetConnection)
if (outputConnectionRecord.WillResetConnection)
{
// Register our immediate intent to set the connection. This unblocks output writers
outputConnectionRecord.ResettingConnection = true;
// This lock avoids interference with buffering RPCs
lock (outputConnectionRecord)
{
// This lock avoids interference with buffering RPCs
// If first reconnect/connect after reset, simply adjust the seq no for the first sent message to the received commit seq no
outputConnectionRecord.ResettingConnection = false;
outputConnectionRecord.LastSeqNoFromLocalService = outputConnectionRecord.BufferedOutput.AdjustFirstSeqNoTo(commitSeqNo);
outputConnectionRecord.WillResetConnection = false;
}
outputConnectionRecord.LastSeqSentToReceiver = commitSeqNo - 1;
}
outputConnectionRecord.LastSeqSentToReceiver = commitSeqNo - 1;

// Enqueue a replay send
if (outputConnectionRecord._sendsEnqueued == 0)
{
// Enqueue a replay send
if (outputConnectionRecord._sendsEnqueued == 0)
{

Interlocked.Increment(ref outputConnectionRecord._sendsEnqueued);
outputConnectionRecord.DataWorkQ.Enqueue(-1);
}
Interlocked.Increment(ref outputConnectionRecord._sendsEnqueued);
outputConnectionRecord.DataWorkQ.Enqueue(-1);
}

// Make sure enough recovery output has been produced before we allow output to start being sent, which means that the next
Expand All @@ -2891,17 +2886,11 @@ private async Task ToDataStreamAsync(Stream writeToStream,
{
// This is a send output
Interlocked.Decrement(ref outputConnectionRecord._sendsEnqueued);

// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Code to manually trim for performance testing
// int placeToTrimTo = outputConnectionRecord.LastSeqNoFromLocalService;
// Console.WriteLine("send to {0}", outputConnectionRecord.LastSeqNoFromLocalService);
outputConnectionRecord.BufferedOutput.AcquireTrimLock(2);
outputConnectionRecord.placeInOutput =
await outputConnectionRecord.BufferedOutput.SendAsync(writeToStream, outputConnectionRecord.placeInOutput, reconnecting);
await outputConnectionRecord.BufferedOutput.SendAsync(writeToStream, outputConnectionRecord.placeInOutput, reconnecting);
reconnecting = false;
outputConnectionRecord.BufferedOutput.ReleaseTrimLock();
// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Code to manually trim for performance testing
// outputConnectionRecord.TrimTo = placeToTrimTo;
}
}
}
Expand Down

0 comments on commit 16d7d83

Please sign in to comment.