Comments (8)
If the Image
is created as the result of starting a replay it will be receiving data from the recording (i.e. replay Publication
). The Image.position()
returns the currently consumed (polled) position from the published stream.
You can use Image.position()
as the latest processed position but only if you already have an Image
. Which requires a Subscription
which in turn relies on the replay being started.
The code to start a replay will be something along the lines:
final Aeron aeron = ...;
final AeronArchive archive = ...;
final String replayChannel = ...;
final int replayStreamId = ...;
final long position = ...;
final int sessionId = (int)archive.startReplay(
recordingId, position, Long.MAX_VALUE, replayChannel, replayStreamId);
final String replaySessionChannel = ChannelUri.addSessionId(replayChannel, sessionId);
final Subscription subscription = aeron.addSubscription(replaySessionChannel, replayStreamId);
Image image;
idleStrategy.reset();
while (null == (image = subscription.imageBySessionId(sessionId)))
{
idleStrategy.idle();
}
...
There are multiple examples of this in Aeron code base (e.g. BasicArchiveTest#shouldRecordThenBoundReplayWithCounter).
from aeron.
Hey @dengdaoping, the lastProcessedPosition you are using doesn't line up with a message boundary. Worth double-checking where you're getting that value from/making sure that you're replaying the correct recording.
from aeron.
Hey @JPWatson
var ctx = new Aeron.Context().aeronDirectoryName(aeronDir);
ctx.availableImageHandler(
(image) -> {
aeronImage = image;
});
aeron = Aeron.connect(ctx);
aeronArchive = AeronArchive.connect(new AeronArchive.Context()
.controlRequestChannel(archiveRequestChannel)
.controlRequestStreamId(archiveRequestStream)
.controlResponseChannel(archiveResponseChannel)
.controlResponseStreamId(archiveResponseStream)
.aeron(aeron));
I'll get the image first, and then I'll get the position from the image.
lastProcessedPosition = aeronImage.position()
private long findLatestRecording(final AeronArchive archive, String channel, int stream)
{
final MutableLong lastRecordingId = new MutableLong();
final RecordingDescriptorConsumer consumer =
(controlSessionId, correlationId, recordingId,
startTimestamp, stopTimestamp, startPosition,
stopPosition, initialTermId, segmentFileLength,
termBufferLength, mtuLength, sessionId,
streamId, strippedChannel, originalChannel,
sourceIdentity) -> lastRecordingId.set(recordingId);
final long fromRecordingId = 0L;
final int recordCount = 100;
final int foundCount = archive.listRecordingsForUri(fromRecordingId, recordCount, channel, stream, consumer);
if (foundCount == 0)
{
throw new IllegalStateException("no recordings found");
}
return lastRecordingId.get();
}
recordingId = findLatestRecording(aeronArchive, channel, streamId);
Is it possible that we are not getting the recordingId in the right way?
from aeron.
It looks like you're getting the latest recording ID with the provided stream ID/channel info (assuming there are fewer than 100 matching recordings). I can't say from the snippets you've shared whether the lastProcessedPosition is from that recording.
from aeron.
Thanks @JPWatson , please the right way, how do we get the latest recordingId and lastProcessedPosition?
from aeron.
@dengdaoping Two issues:
-
AeronArchive#listRecordingsForUri
can return multiple recordings for the same channel/stream. Your code will blindly use the lastrecordingId
from the list. -
Aeron.Context#availableImageHandler(AvailableImageHandler)
sets a defaultAvailableImageHandler
which will be called when anyImage
is available. This callback will only be called if it is not overridden with the instance-specific one and when aSubscription
is connected to aPublication
. Finally,Image.position()
returns the current position of anImage
which at the beginning will be equal to the join position, i.e. the position of thePublication
.If I understand correctly, you want to use the last processed position which is a position up to which the application has consumed from the recording? If this the case, you'll need track this position in the application and persist it somewhere (e.g. database, local file etc.) so that you can get its value when the application restarts if you want to avoid missing messages. Alternatively, you can always start replay from the
startPosition
of the recording and deal with duplicate message processing.
from aeron.
@vyazelenko Possibly stupid question, to confirm with you, Image Position and Recording Position, if the value in position is the same, the corresponding data is the same?
from aeron.
For example, I start startReplay
recordingId = findLatestRecording(aeronArchive, channel, streamId);
sessionId = aeronArchive.startReplay(
recordingId,
lastProcessedPosition,
Long.MAX_VALUE,
channel,
replayStreamId);
Whenever an application consumes from a recording, it stores the position into redis
redis.set("lastProcessedPosition", aeronImage.position())
The problem persists when the service is restarted
ERROR - response for correlationId=15, error: 59232 position not aligned to a data header, recordingId=5, sessionId=27680399597.
from aeron.
Related Issues (20)
- aeronmd.c Closing multi-publisher IPC publication HOT 2
- Cannot set thread affinity in shared or sharednetwork modes for c media driver HOT 3
- list-members(ClusterTool) command does not show isLeader accurately
- big latency while transmit small packets cross different AWS zone over Aeron comparing with raw UDP HOT 2
- AeronCluster.AsyncConnect can forget to close subscription HOT 1
- [C++] `ReplayMerge` with multicast live destination doesn't merge. HOT 10
- AeronCluster.java decoding order issue HOT 3
- OpenTelemetry Integration
- Invoke fileChannel's force method before close HOT 2
- Heartbeats being sent, despite no publishing. HOT 5
- `ReplayMerge::doWork` throws exceptions without descriptions.
- ReplayMerge join position is greater than the replay position HOT 4
- AeronCluster client (gateway) - SIGSEGV HOT 1
- Set thread name to "client-conductor" fails.
- aeron ping-pong example build should detect sendmmsg
- code examples for C or C++ HOT 2
- Archive ConductorServiceTimeoutException when using `useConductorAgentInvoker` HOT 2
- [C Media Driver]: Custom poller and receiver functions HOT 2
- Entire cluster of 3 members getting stuck if one of the followers gets stuck HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from aeron.