'Aeron Archive - extend existing recording
I have an Aeron Archive and want to extend the existing recording, i.e. that it continuse to append messages after the service restart.
I wasn't able to find any actual example how to do that, so I came with the my own code based on Aeron javadoc/Aeron Cookbook.
What I have so far
I'm trying to get the lastRecordingId, the position etc. from the archive itself first.
private void findLatestRecording() {
final RecordingDescriptorConsumer consumer =
(controlSessionId, correlationId, recordingId,
startTimestamp, stopTimestamp, startPosition,
stopPosition, initialTermId, segmentFileLength,
termBufferLength, mtuLength, sessionId,
streamId, strippedChannel, originalChannel,
sourceIdentity) -> {
AeronArchiveJournal.this.lastRecordingId = recordingId;
AeronArchiveJournal.this.lastRecordingPosition = stopPosition;
AeronArchiveJournal.this.initialTermId = initialTermId;
AeronArchiveJournal.this.termBufferLength = termBufferLength;
};
final long fromRecordingId = 0L;
final int recordCount = 100;
final int foundCount = archive.listRecordingsForUri(fromRecordingId, recordCount, AeronChannels.ipc(), AeronStreams.STREAM_ID_JOURNAL, consumer);
if (foundCount == 0) {
LOG.info("No previous recording found, will start a new one");
}
}
then I'm trying to extend the recording
private void extendExistingRecording() {
publication = aeron.addExclusivePublication(AeronChannels.ipc(), AeronStreams.STREAM_ID_JOURNAL);
String channelUri = new ChannelUriStringBuilder()
.media(CommonContext.IPC_MEDIA)
.initialPosition(lastRecordingPosition, initialTermId, termBufferLength)
.build();
LOG.info("Extending existing recording");
LOG.info("Recording id: {}", lastRecordingId);
LOG.info("Channel URI: {}", channelUri);
archive.extendRecording(lastRecordingId, channelUri, AeronStreams.STREAM_ID_JOURNAL, SourceLocation.LOCAL);
LOG.info("Waiting for recording to start for session {}", publication.sessionId());
final CountersReader counters = aeron.countersReader();
int counterId = RecordingPos.findCounterIdBySession(counters, publication.sessionId());
while (CountersReader.NULL_COUNTER_ID == counterId) {
journalIdleStrategy.idle();
counterId = RecordingPos.findCounterIdBySession(counters, publication.sessionId());
}
lastRecordingId = RecordingPos.getRecordingId(counters, counterId);
}
However the CountersReader
loop never finishes and aeron prints following warning:
io.aeron.archive.client.ArchiveEvent: WARN - cannot extend recording 0 image.joinPosition=0 != rec.stopPosition=25312
Clearly I'm missing something, but so far I'm not able to figure out what.
Solution 1:[1]
To extend an existing recording it is necessary to set the initial position of the publication you will use to extend the existing recording. The stream needs to be initialised so it is a genuine extension. This tests shows how it is done.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Martin Thompson |