From 35e50c418f44fa2669cacac323b8653c8ddb0540 Mon Sep 17 00:00:00 2001 From: Jonathan Goldstein Date: Fri, 18 Jan 2019 11:04:02 -0800 Subject: [PATCH] Only corruption bug fix. No proactive extra locking --- Ambrosia/Ambrosia/Program.cs | 212 +++++++++++++++++------------------ 1 file changed, 102 insertions(+), 110 deletions(-) diff --git a/Ambrosia/Ambrosia/Program.cs b/Ambrosia/Ambrosia/Program.cs index 8d195b4f..1b229e59 100644 --- a/Ambrosia/Ambrosia/Program.cs +++ b/Ambrosia/Ambrosia/Program.cs @@ -438,7 +438,7 @@ internal async Task SendAsync(Stream outputStream, if (posToStart == 0) { // We are starting to send contents of the page. Send everything - numReplayableMessagesToSend = (int) curBuffer.TotalReplayableMessages; + numReplayableMessagesToSend = (int)curBuffer.TotalReplayableMessages; } else { @@ -490,8 +490,8 @@ internal async Task SendAsync(Stream outputStream, _owningOutputRecord.LastSeqSentToReceiver += numRPCs; // Must handle cases where trim came in during the actual send and reset or pushed the iterator - if ((_owningOutputRecord.placeInOutput != null) && - ((_owningOutputRecord.placeInOutput.PageEnumerator != bufferEnumerator) || + if ((_owningOutputRecord.placeInOutput != null) && + ((_owningOutputRecord.placeInOutput.PageEnumerator != bufferEnumerator) || _owningOutputRecord.placeInOutput.PagePos == -1)) { // Trim replaced the enumerator. Must reset @@ -597,7 +597,7 @@ private void addBufferPage(int writeLength, ReleaseAppendLock(); while (!_pool.TryDequeue(out bufferPage)) { - if (_owningRuntime.Recovering || _owningOutputRecord.ResettingConnection || + if (_owningRuntime.Recovering || _owningOutputRecord.ResettingConnection || _owningRuntime.CheckpointingService || _owningOutputRecord.ConnectingAfterRestart) { var newBufferPageBytes = new byte[Math.Max(defaultPageSize, writeLength)]; @@ -754,7 +754,7 @@ internal long TrimAndUnbufferNonreplayableCalls(long trimSeqNo, { return matchingReplayableSeqNo; } - // No locking necessary since this should only get called during recovery before replay and before a checkpoint is sent to service + // No locking necessary since this should only get called during recovery before replay and before a checkpooint is sent to service // First trim long highestTrimmedSeqNo = -1; while (!_bufferQ.IsEmpty()) @@ -780,7 +780,7 @@ internal long TrimAndUnbufferNonreplayableCalls(long trimSeqNo, { // May need to remove some data from the page int readBufferPos = 0; - for (var i = currentHead.LowestSeqNo; i <= trimSeqNo; i++ ) + for (var i = currentHead.LowestSeqNo; i <= trimSeqNo; i++) { int eventSize = currentHead.PageBytes.ReadBufferedInt(readBufferPos); var methodID = currentHead.PageBytes.ReadBufferedInt(readBufferPos + StreamCommunicator.IntSize(eventSize) + 2); @@ -830,7 +830,7 @@ internal long TrimAndUnbufferNonreplayableCalls(long trimSeqNo, return nextReplayableSeqNo - 1; } - internal void RebaseSeqNosInBuffer(long commitSeqNo, + internal void RebaseSeqNosInBuffer(long commitSeqNo, long commitSeqNoReplayable) { var seqNoDiff = commitSeqNo - commitSeqNoReplayable; @@ -884,6 +884,7 @@ internal class OutputConnectionRecord public long _sendsEnqueued; public AmbrosiaRuntime MyAmbrosia { get; set; } public bool WillResetConnection { get; set; } + public bool ResettingConnection { get; set; } public bool ConnectingAfterRestart { get; set; } // The latest trim location on the other side. An associated trim message MAY have already been sent public long RemoteTrim { get; set; } @@ -891,7 +892,6 @@ internal class OutputConnectionRecord public long RemoteTrimReplayable { get; set; } // The seq no of the last RPC sent to the receiver public long LastSeqSentToReceiver; - internal volatile bool ResettingConnection; public OutputConnectionRecord(AmbrosiaRuntime inAmbrosia) { @@ -1139,12 +1139,8 @@ private void SendInputWatermarks(ConcurrentDictionary uncommit outputs[kv.Key] = outputConnectionRecord; Console.WriteLine("Adding output:{0}", kv.Key); } - // Make sure RemoteTrim and RemoteTrimReplayable are atomically updated w.r.t. send - lock (outputConnectionRecord) - { - outputConnectionRecord.RemoteTrim = Math.Max(kv.Value.First, outputConnectionRecord.RemoteTrim); - outputConnectionRecord.RemoteTrimReplayable = Math.Max(kv.Value.Second, outputConnectionRecord.RemoteTrimReplayable); - } + outputConnectionRecord.RemoteTrim = Math.Max(kv.Value.First, outputConnectionRecord.RemoteTrim); + outputConnectionRecord.RemoteTrimReplayable = Math.Max(kv.Value.Second, outputConnectionRecord.RemoteTrimReplayable); if (outputConnectionRecord.ControlWorkQ.IsEmpty) { outputConnectionRecord.ControlWorkQ.Enqueue(-2); @@ -2532,63 +2528,63 @@ private void LocalListener() // Do an async message read. Note that the async aspect of this is slow. FlexReadBuffer.Deserialize(_localServiceReceiveFromStream, localServiceBuffer); ProcessSyncLocalMessage(ref localServiceBuffer, batchServiceBuffer); -/* Disabling because of BUGBUG. Eats checkpoint bytes in some circumstances before checkpointer can deal with it. - // Process more messages from the local service if available before going async again, doing this here because - // not all language shims will be good citizens here, and we may need to process small messages to avoid inefficiencies - // in LAR. - int curPosInBuffer = 0; - int readBytes = 0; - while (readBytes != 0 || _localServiceReceiveFromStream.DataAvailable) - { - // Read data into buffer to avoid lock contention of reading directly from the stream - while ((_localServiceReceiveFromStream.DataAvailable && readBytes < bufferSize) || !bytes.EnoughBytesForReadBufferedInt(0, readBytes)) - { - readBytes += _localServiceReceiveFromStream.Read(bytes, readBytes, bufferSize - readBytes); - } - // Continue loop as long as we can meaningfully read a message length - var memStream = new MemoryStream(bytes, 0, readBytes); - while (bytes.EnoughBytesForReadBufferedInt(curPosInBuffer, readBytes - curPosInBuffer)) - { - // Read the length of the next message - var messageSize = memStream.ReadInt(); - var messageSizeSize = StreamCommunicator.IntSize(messageSize); - memStream.Position -= messageSizeSize; - if (curPosInBuffer + messageSizeSize + messageSize > readBytes) - { - // didn't read the full message into the buffer. It must be torn - if (messageSize + messageSizeSize > bufferSize) - { - // Buffer isn't big enough to hold the whole torn event even if empty. Increase the buffer size so the message can fit. - bufferSize = messageSize + messageSizeSize; - var newBytes = new byte[bufferSize]; - Buffer.BlockCopy(bytes, curPosInBuffer, newBytes, 0, readBytes - curPosInBuffer); - bytes = newBytes; - bytesBak = new byte[bufferSize]; - readBytes -= curPosInBuffer; - curPosInBuffer = 0; - } - break; - } - else - { - // Count this message since it is fully in the buffer - FlexReadBuffer.Deserialize(memStream, localServiceBuffer); - ProcessSyncLocalMessage(ref localServiceBuffer, batchServiceBuffer); - curPosInBuffer += messageSizeSize + messageSize; - } - } - memStream.Dispose(); - // Shift torn message to the beginning unless it is the first one - if (curPosInBuffer > 0) - { - Buffer.BlockCopy(bytes, curPosInBuffer, bytesBak, 0, readBytes - curPosInBuffer); - var tempBytes = bytes; - bytes = bytesBak; - bytesBak = tempBytes; - readBytes -= curPosInBuffer; - curPosInBuffer = 0; - } - } */ + /* Disabling because of BUGBUG. Eats checkpoint bytes in some circumstances before checkpointer can deal with it. + // Process more messages from the local service if available before going async again, doing this here because + // not all language shims will be good citizens here, and we may need to process small messages to avoid inefficiencies + // in LAR. + int curPosInBuffer = 0; + int readBytes = 0; + while (readBytes != 0 || _localServiceReceiveFromStream.DataAvailable) + { + // Read data into buffer to avoid lock contention of reading directly from the stream + while ((_localServiceReceiveFromStream.DataAvailable && readBytes < bufferSize) || !bytes.EnoughBytesForReadBufferedInt(0, readBytes)) + { + readBytes += _localServiceReceiveFromStream.Read(bytes, readBytes, bufferSize - readBytes); + } + // Continue loop as long as we can meaningfully read a message length + var memStream = new MemoryStream(bytes, 0, readBytes); + while (bytes.EnoughBytesForReadBufferedInt(curPosInBuffer, readBytes - curPosInBuffer)) + { + // Read the length of the next message + var messageSize = memStream.ReadInt(); + var messageSizeSize = StreamCommunicator.IntSize(messageSize); + memStream.Position -= messageSizeSize; + if (curPosInBuffer + messageSizeSize + messageSize > readBytes) + { + // didn't read the full message into the buffer. It must be torn + if (messageSize + messageSizeSize > bufferSize) + { + // Buffer isn't big enough to hold the whole torn event even if empty. Increase the buffer size so the message can fit. + bufferSize = messageSize + messageSizeSize; + var newBytes = new byte[bufferSize]; + Buffer.BlockCopy(bytes, curPosInBuffer, newBytes, 0, readBytes - curPosInBuffer); + bytes = newBytes; + bytesBak = new byte[bufferSize]; + readBytes -= curPosInBuffer; + curPosInBuffer = 0; + } + break; + } + else + { + // Count this message since it is fully in the buffer + FlexReadBuffer.Deserialize(memStream, localServiceBuffer); + ProcessSyncLocalMessage(ref localServiceBuffer, batchServiceBuffer); + curPosInBuffer += messageSizeSize + messageSize; + } + } + memStream.Dispose(); + // Shift torn message to the beginning unless it is the first one + if (curPosInBuffer > 0) + { + Buffer.BlockCopy(bytes, curPosInBuffer, bytesBak, 0, readBytes - curPosInBuffer); + var tempBytes = bytes; + bytes = bytesBak; + bytesBak = tempBytes; + readBytes -= curPosInBuffer; + curPosInBuffer = 0; + } + } */ } } catch (Exception e) @@ -2622,7 +2618,7 @@ private void ProcessSyncLocalMessage(ref FlexReadBuffer localServiceBuffer, Flex Console.WriteLine("Reading a checkpoint {0} bytes", _lastReceivedCheckpointSize); LastReceivedCheckpoint = localServiceBuffer; // Block this thread until checkpointing is complete - while (LastReceivedCheckpoint != null) { Thread.Yield();}; + while (LastReceivedCheckpoint != null) { Thread.Yield(); }; break; case attachToByte: @@ -2697,7 +2693,7 @@ private void ProcessSyncLocalMessage(ref FlexReadBuffer localServiceBuffer, Flex break; default: - // This one really should terminate the process; no recovery allowed. + // This one really should terminate the process; no recovery allowed. OnError(0, "Illegal leading byte in local message"); break; } @@ -2764,7 +2760,7 @@ private void ProcessRPC(FlexReadBuffer RpcBuffer) writablePage.HighestSeqNo = _shuffleOutputRecord.LastSeqNoFromLocalService + 1; var methodID = RpcBuffer.Buffer.ReadBufferedInt(restOfRPCOffset + 1); - if (RpcBuffer.Buffer[restOfRPCOffset + 1 + StreamCommunicator.IntSize(methodID)] != (byte) RpcTypes.RpcType.Impulse) + if (RpcBuffer.Buffer[restOfRPCOffset + 1 + StreamCommunicator.IntSize(methodID)] != (byte)RpcTypes.RpcType.Impulse) { writablePage.UnsentReplayableMessages++; writablePage.TotalReplayableMessages++; @@ -2799,6 +2795,7 @@ private void ProcessRPC(FlexReadBuffer RpcBuffer) private async Task ToDataStreamAsync(Stream writeToStream, string destString, CancellationToken ct) + { OutputConnectionRecord outputConnectionRecord; if (destString.Equals(_serviceName)) @@ -2886,11 +2883,18 @@ private async Task ToDataStreamAsync(Stream writeToStream, { // This is a send output Interlocked.Decrement(ref outputConnectionRecord._sendsEnqueued); + + // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Code to manually trim for performance testing + // int placeToTrimTo = outputConnectionRecord.LastSeqNoFromLocalService; + // Console.WriteLine("send to {0}", outputConnectionRecord.LastSeqNoFromLocalService); outputConnectionRecord.BufferedOutput.AcquireTrimLock(2); + var placeAtCall = outputConnectionRecord.LastSeqSentToReceiver; outputConnectionRecord.placeInOutput = - await outputConnectionRecord.BufferedOutput.SendAsync(writeToStream, outputConnectionRecord.placeInOutput, reconnecting); + await outputConnectionRecord.BufferedOutput.SendAsync(writeToStream, outputConnectionRecord.placeInOutput, reconnecting); reconnecting = false; outputConnectionRecord.BufferedOutput.ReleaseTrimLock(); + // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Code to manually trim for performance testing + // outputConnectionRecord.TrimTo = placeToTrimTo; } } } @@ -2964,13 +2968,8 @@ private async Task ToControlStreamAsync(Stream writeToStream, if (lastRemoteTrim < outputConnectionRecord.RemoteTrim) { // This is a send watermark - // Get the new watermarks without interference from commit - long lastRemoteTrimReplayable; - lock (outputConnectionRecord) - { - lastRemoteTrim = outputConnectionRecord.RemoteTrim; - lastRemoteTrimReplayable = outputConnectionRecord.RemoteTrimReplayable; - } + lastRemoteTrim = outputConnectionRecord.RemoteTrim; + var lastRemoteTrimReplayable = outputConnectionRecord.RemoteTrimReplayable; watermarkStream.Position = 0; var watermarkLength = 1 + StreamCommunicator.LongSize(lastRemoteTrim) + StreamCommunicator.LongSize(lastRemoteTrimReplayable); watermarkStream.WriteInt(watermarkLength); @@ -3051,10 +3050,10 @@ private async Task FromDataStreamAsync(Stream readFromStream, { Console.WriteLine("restoring input:{0}", sourceString); } + inputConnectionRecord.DataConnectionStream = (NetworkStream)readFromStream; await SendReplayMessageAsync(readFromStream, inputConnectionRecord.LastProcessedID + 1, inputConnectionRecord.LastProcessedReplayableID + 1, ct); // Create new input task for monitoring new input Task inputTask; - inputConnectionRecord.DataConnectionStream = (NetworkStream)readFromStream; inputTask = InputDataListenerAsync(inputConnectionRecord, sourceString, ct); await inputTask; } @@ -3134,21 +3133,18 @@ private async Task InputControlListenerAsync(InputConnectionRecord inputRecord, // Find the appropriate connection record var outputConnectionRecord = _outputs[inputName]; - lock (outputConnectionRecord) + // Check to make sure this is progress, otherwise, can ignore + if (commitSeqNo > outputConnectionRecord.TrimTo && !outputConnectionRecord.WillResetConnection && !outputConnectionRecord.ConnectingAfterRestart) { - // Check to make sure this is progress, otherwise, can ignore - if (commitSeqNo > outputConnectionRecord.TrimTo && !outputConnectionRecord.WillResetConnection && !outputConnectionRecord.ConnectingAfterRestart) + outputConnectionRecord.TrimTo = Math.Max(outputConnectionRecord.TrimTo, commitSeqNo); + outputConnectionRecord.ReplayableTrimTo = Math.Max(outputConnectionRecord.ReplayableTrimTo, replayableCommitSeqNo); + if (outputConnectionRecord.ControlWorkQ.IsEmpty) { - outputConnectionRecord.TrimTo = Math.Max(outputConnectionRecord.TrimTo, commitSeqNo); - outputConnectionRecord.ReplayableTrimTo = Math.Max(outputConnectionRecord.ReplayableTrimTo, replayableCommitSeqNo); - if (outputConnectionRecord.ControlWorkQ.IsEmpty) - { - outputConnectionRecord.ControlWorkQ.Enqueue(-2); - } - lock (_committer._trimWatermarks) - { - _committer._trimWatermarks[inputName] = replayableCommitSeqNo; - } + outputConnectionRecord.ControlWorkQ.Enqueue(-2); + } + lock (_committer._trimWatermarks) + { + _committer._trimWatermarks[inputName] = replayableCommitSeqNo; } } break; @@ -3169,7 +3165,7 @@ private async Task ProcessInputMessage(InputConnectionRecord inputRecord, { case RPCByte: var methodID = inputFlexBuffer.Buffer.ReadBufferedInt(sizeBytes + 2); - if (inputFlexBuffer.Buffer[sizeBytes + 2 + StreamCommunicator.IntSize(methodID)] != (byte) RpcTypes.RpcType.Impulse) + if (inputFlexBuffer.Buffer[sizeBytes + 2 + StreamCommunicator.IntSize(methodID)] != (byte)RpcTypes.RpcType.Impulse) { inputRecord.LastProcessedReplayableID++; } @@ -3341,12 +3337,8 @@ public async Task CheckpointAsync() outputConnectionRecord = new OutputConnectionRecord(this); _outputs[kv.Key] = outputConnectionRecord; } - // Make sure RemoteTrim and RemoteTrimReplayable are atomically updated w.r.t. send - lock (outputConnectionRecord) - { - outputConnectionRecord.RemoteTrim = Math.Max(kv.Value.LastProcessedID, outputConnectionRecord.RemoteTrim); - outputConnectionRecord.RemoteTrimReplayable = Math.Max(kv.Value.LastProcessedReplayableID, outputConnectionRecord.RemoteTrimReplayable); - } + outputConnectionRecord.RemoteTrim = Math.Max(kv.Value.LastProcessedID, outputConnectionRecord.RemoteTrim); + outputConnectionRecord.RemoteTrimReplayable = Math.Max(kv.Value.LastProcessedReplayableID, outputConnectionRecord.RemoteTrimReplayable); if (outputConnectionRecord.ControlWorkQ.IsEmpty) { outputConnectionRecord.ControlWorkQ.Enqueue(-2); @@ -3424,7 +3416,7 @@ internal void RuntimeChecksOnProcessStart() OnError(MissingCheckpoint, "No checkpoint/logs directory"); } var lastCommittedCheckpoint = long.Parse(RetrieveServiceInfo("LastCommittedCheckpoint")); - if (!LogWriter.FileExists(Path.Combine(_serviceLogPath + _serviceName + "_" + _currentVersion, + if (!LogWriter.FileExists(Path.Combine(_serviceLogPath + _serviceName + "_" + _currentVersion, "server" + "chkpt" + lastCommittedCheckpoint))) { OnError(MissingCheckpoint, "Missing checkpoint " + lastCommittedCheckpoint.ToString()); @@ -3476,8 +3468,8 @@ long upgradeToVersion _sharded = false; _coral = ClientLibrary; - Console.WriteLine("Logs directory: {0}", _serviceLogPath); - + Console.WriteLine("Logs directory: {0}", _serviceLogPath); + if (createService == null) { if (LogWriter.DirectoryExists(_serviceLogPath + _serviceName + "_" + _currentVersion)) @@ -3557,7 +3549,7 @@ static void Main(string[] args) var replicaName = $"{_instanceName}{_replicaNumber}"; AmbrosiaRuntimeParams param = new AmbrosiaRuntimeParams(); param.createService = _recoveryMode == AmbrosiaRecoveryModes.A - ? (bool?) null + ? (bool?)null : (_recoveryMode != AmbrosiaRecoveryModes.N); param.pauseAtStart = _isPauseAtStart; param.persistLogs = _isPersistLogs; @@ -3613,7 +3605,7 @@ private static void ParseAndValidateOptions(string[] args) var options = ParseOptions(args, out var shouldShowHelp); ValidateOptions(options, shouldShowHelp); } - + private static OptionSet ParseOptions(string[] args, out bool shouldShowHelp) { var showHelp = false; @@ -3651,7 +3643,7 @@ private static OptionSet ParseOptions(string[] args, out bool shouldShowHelp) }.AddMany(registerInstanceOptionSet); var debugInstanceOptionSet = basicOptions.AddMany(new OptionSet { - + { "c|checkpoint=", "The checkpoint # to load.", c => _checkpointToLoad = long.Parse(c) }, { "cv|currentVersion=", "The version # to debug.", cv => _currentVersion = int.Parse(cv) }, { "tu|testingUpgrade", "Is testing upgrade.", u => _isTestingUpgrade = true }, @@ -3814,4 +3806,4 @@ public static string GetDescription(this Enum value) return (attribute as DescriptionAttribute)?.Description; // ?? string.Empty maybe added } } -} +} \ No newline at end of file