Coder Social home page Coder Social logo

Comments (8)

vyazelenko avatar vyazelenko commented on June 2, 2024 1

If the Image is created as the result of starting a replay it will be receiving data from the recording (i.e. replay Publication). The Image.position() returns the currently consumed (polled) position from the published stream.

You can use Image.position() as the latest processed position but only if you already have an Image. Which requires a Subscription which in turn relies on the replay being started.
The code to start a replay will be something along the lines:

final Aeron aeron = ...;
final AeronArchive archive = ...;
final String replayChannel = ...;
final int replayStreamId = ...;
final long position = ...;
final int sessionId = (int)archive.startReplay(
    recordingId, position, Long.MAX_VALUE, replayChannel, replayStreamId);

final String replaySessionChannel = ChannelUri.addSessionId(replayChannel, sessionId);
final Subscription subscription = aeron.addSubscription(replaySessionChannel, replayStreamId);

Image image;
idleStrategy.reset();
while (null == (image = subscription.imageBySessionId(sessionId)))
{
    idleStrategy.idle();
}
...

There are multiple examples of this in Aeron code base (e.g. BasicArchiveTest#shouldRecordThenBoundReplayWithCounter).

from aeron.

JPWatson avatar JPWatson commented on June 2, 2024

Hey @dengdaoping, the lastProcessedPosition you are using doesn't line up with a message boundary. Worth double-checking where you're getting that value from/making sure that you're replaying the correct recording.

from aeron.

dengdaoping avatar dengdaoping commented on June 2, 2024

Hey @JPWatson

    var ctx = new Aeron.Context().aeronDirectoryName(aeronDir);
    ctx.availableImageHandler(
        (image) -> {
         aeronImage = image;
        });
    aeron = Aeron.connect(ctx);
    aeronArchive = AeronArchive.connect(new AeronArchive.Context()
         .controlRequestChannel(archiveRequestChannel)
        .controlRequestStreamId(archiveRequestStream)
        .controlResponseChannel(archiveResponseChannel)
        .controlResponseStreamId(archiveResponseStream)
        .aeron(aeron));

I'll get the image first, and then I'll get the position from the image.
lastProcessedPosition = aeronImage.position()

private long findLatestRecording(final AeronArchive archive, String channel, int stream)
{
   final MutableLong lastRecordingId = new MutableLong();
   final RecordingDescriptorConsumer consumer =
       (controlSessionId, correlationId, recordingId,
       startTimestamp, stopTimestamp, startPosition,
       stopPosition, initialTermId, segmentFileLength,
       termBufferLength, mtuLength, sessionId,
       streamId, strippedChannel, originalChannel,
       sourceIdentity) -> lastRecordingId.set(recordingId);

   final long fromRecordingId = 0L;
   final int recordCount = 100;
   final int foundCount = archive.listRecordingsForUri(fromRecordingId, recordCount, channel, stream, consumer);

   if (foundCount == 0)
   {
       throw new IllegalStateException("no recordings found");
   }
   return lastRecordingId.get();
}

recordingId = findLatestRecording(aeronArchive, channel, streamId);
Is it possible that we are not getting the recordingId in the right way?

from aeron.

JPWatson avatar JPWatson commented on June 2, 2024

It looks like you're getting the latest recording ID with the provided stream ID/channel info (assuming there are fewer than 100 matching recordings). I can't say from the snippets you've shared whether the lastProcessedPosition is from that recording.

from aeron.

dengdaoping avatar dengdaoping commented on June 2, 2024

Thanks @JPWatson , please the right way, how do we get the latest recordingId and lastProcessedPosition?

from aeron.

vyazelenko avatar vyazelenko commented on June 2, 2024

@dengdaoping Two issues:

  • AeronArchive#listRecordingsForUri can return multiple recordings for the same channel/stream. Your code will blindly use the last recordingId from the list.

  • Aeron.Context#availableImageHandler(AvailableImageHandler) sets a default AvailableImageHandler which will be called when any Image is available. This callback will only be called if it is not overridden with the instance-specific one and when a Subscription is connected to a Publication. Finally, Image.position() returns the current position of an Image which at the beginning will be equal to the join position, i.e. the position of the Publication.

    If I understand correctly, you want to use the last processed position which is a position up to which the application has consumed from the recording? If this the case, you'll need track this position in the application and persist it somewhere (e.g. database, local file etc.) so that you can get its value when the application restarts if you want to avoid missing messages. Alternatively, you can always start replay from the startPosition of the recording and deal with duplicate message processing.

from aeron.

dengdaoping avatar dengdaoping commented on June 2, 2024

@vyazelenko Possibly stupid question, to confirm with you, Image Position and Recording Position, if the value in position is the same, the corresponding data is the same?

from aeron.

dengdaoping avatar dengdaoping commented on June 2, 2024

For example, I start startReplay

    recordingId = findLatestRecording(aeronArchive, channel, streamId);
    sessionId = aeronArchive.startReplay(
        recordingId,
        lastProcessedPosition,
        Long.MAX_VALUE,
        channel,
        replayStreamId);

Whenever an application consumes from a recording, it stores the position into redis
redis.set("lastProcessedPosition", aeronImage.position())

The problem persists when the service is restarted
ERROR - response for correlationId=15, error: 59232 position not aligned to a data header, recordingId=5, sessionId=27680399597.

from aeron.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.