[Suggestion] Stream - reset offset auto tracking for consumer #14124
-
RabbitMQ series4.1.x Operating system (distribution) usedAlmaLinux How is RabbitMQ deployed?RPM package What would you like to suggest for a future version of RabbitMQ?It would be useful to reset stream server-side offset auto tracking for a given consumer. I am aware that we can use manual offset tracking or we can change name of the consumer, but that is a messy workaround. It would be nice to leverage RabbitMQ offset auto tracking. Changing tracking strategy to manual after auto tracking (consumer with same name) is also not possible as it is denied with message Usecase for resetting an offset for consumer is rereading a stream without changing consumer's name + leaving the stream data intact. We have application, which uses partial data from stream, but afterwards needs more data from the stream, where rereading a stream is crucial. Secondary usecase is to clear offset tracking on long-lasting stream. |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 5 replies
-
Would you mind experimenting with the You need to create a |
Beta Was this translation helpful? Give feedback.
-
I created a small JBang util to reset stream offset so far. Run with ///usr/bin/env jbang "$0" "$@" ; exit $?
//DEPS info.picocli:picocli:4.6.3
//DEPS com.rabbitmq:stream-client:0.18.0
//DEPS org.apache.logging.log4j:log4j-api:2.20.0
//DEPS org.apache.logging.log4j:log4j-core:2.20.0
//DEPS org.apache.logging.log4j:log4j-slf4j-impl:2.20.0
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
import java.util.concurrent.Callable;
import com.rabbitmq.stream.impl.Client;
@Command(name = "rabbitmq_stream_change_offset", mixinStandardHelpOptions = true, version = "rabbitmq_stream_change_offset 0.1",
description = "rabbitmq_stream_change_offset made with jbang")
class rabbitmq_stream_change_offset implements Callable<Integer> {
@Option(names = { "--host" }, description = "RabbitMQ host", arity = "1")
private String host;
@Option(names = { "--port" }, description = "RabbitMQ stream port", defaultValue = "5552")
private int port;
@Option(names = { "--user" }, description = "RabbitMQ user username", arity = "1")
private String username;
@Option(names = { "--ref" }, description = "Consumer reference for reset (usually app name)", arity = "1")
private String reference; // stream reference ~ app name
@Option(names = { "--stream" }, description = "RabbitMQ stream to reset", arity = "1")
private String stream;
@Option(names = { "--offset" }, description = "New stream offset to set", defaultValue = "0")
private long offset;
public static void main(String... args) {
int exitCode = new CommandLine(new rabbitmq_stream_change_offset()).execute(args);
System.exit(exitCode);
}
@Override
public Integer call() throws Exception {
System.out.println("Given config for stream offset change:");
System.out.println(" -- RabbitMQ = " + host + ":" + port);
System.out.println(" -- stream = " + stream);
System.out.println(" -- consumer ref = " + reference);
System.out.println(" -- offset = " + offset);
System.out.println("\n");
final String password = new String(System.console().readPassword("Password for user '" + username + "': "));
if (password.isBlank()) {
System.out.println("No password given");
return 1;
}
System.out.println("Creating new client");
final Client client = new Client(
new Client.ClientParameters()
.host(host)
.port(port)
.username(username)
.password(password)
);
final Client.QueryOffsetResponse queryOffsetResponse = client.queryOffset(reference, stream);
if (queryOffsetResponse.getResponseCode() != 1) {
System.out.println("Could not query offset - responseCode=" + queryOffsetResponse.getResponseCode());
return 1;
}
System.out.println("Changing offset from " + queryOffsetResponse.getOffset() + " to " + offset);
if (queryOffsetResponse.getOffset() == offset) {
System.out.println("No need to change offset");
return 0;
}
client.storeOffset(reference, stream, offset);
System.out.println("Closing client");
client.close();
System.out.println("Done");
return 0;
}
} |
Beta Was this translation helpful? Give feedback.
Would you mind experimenting with the
Client#storeOffset(String, String, long)
andClient#queryOffset(String, String)
methods?You need to create a
Client
instance along theEnvironment
, but that should be OK for development. You can then store offset 0 for the consumer (same stream and reference/name), make sure it has been written (by polling withqueryOffset
) and then start the consumer. We could then discuss a proper API if the experiment addresses your use case.