How to Read Kafka Source Offsets with Flink’s State Processor API

DeltaStream
6 min readSep 20, 2024

--

by Charles Tan

Apache Flink is one of the most popular frameworks for data stream processing. As a stateful processing engine, Flink is able to handle processing logic with aggregations, joins, and windowing. To ensure that Flink jobs are recoverable with exactly-once semantics, Flink has a state-of-the-art state snapshotting mechanism, so in the event of a failure, the job can be resumed from the latest snapshot.

In some advanced use cases, such as job migrations or job auditing, users may be required to inspect or modify their Flink job’s state snapshots (called Savepoints and Checkpoints in Flink). For this purpose, Flink provides the State Processor API. However, this API is not always straightforward to use and requires deep understanding of Flink operator states.

In this post, we’ll cover an example of using the State Processor API, broken up into 3 parts:

  1. Introduce our Flink job which reads data from an Apache Kafka topic
  2. Deep dive into how Flink’s KafkaSource maintains its state
  3. Use the State Processor API to extract the Kafka partition-offset state from the Flink job’s savepoint/checkpoint

If you want to see an example of the State Processor API in use, feel free to skip ahead to the last section.

Note that this post is a technical tutorial for those who want to get started with the State Processor API, and is intended for readers who already have some familiarity with Apache Flink and stream processing concepts.

Creating a Flink Job

Below is the Java code for our Flink job. This job simply reads from the “source” topic in Kafka, deserializes the records as simple Strings, then writes the results to the “sink” topic.

public class FlinkTest {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

KafkaSource source = KafkaSource.builder()
.setBootstrapServers("localhost:9092")
.setTopics("source")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();

DataStream sourceStream = env.fromSource(
source, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source")
.uid("kafkasourceuid");

KafkaRecordSerializationSchema serializer = KafkaRecordSerializationSchema.builder()
.setValueSerializationSchema(new SimpleStringSchema())
.setTopic("sink")
.build();
Properties kprops = new Properties();
kprops.setProperty("transaction.timeout.ms", "300000"); // e.g., 5 mins
KafkaSink sink = KafkaSink.builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(serializer)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setKafkaProducerConfig(kprops)
.setTransactionalIdPrefix("txn-prefix")
.build();

sourceStream.sinkTo(sink);
env.enableCheckpointing(10000L);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
env.getCheckpointConfig().setCheckpointStorage("file:///tmp/checkpoints");
env.execute("tester");
}
}

There are a few important things to note from this Flink job:

  1. We are creating a KafkaSource object.On line 6, the KafkaSource is then given to the StreamExecutionEnvironment’s fromSource method which returns a DataStreamSource object, which represents the actual Flink source operator.
  2. We set the operator ID for our KafkaSource operator using the uid method.This is set on line 16. It’s best practice to set all the IDs for all Flink operators when possible, but we’re emphasizing this here because we’ll actually need to refer to this ID when we use the State Processor API to inspect the state snapshots.
  3. Flink checkpointing is turned on.On lines 33–38, we are configuring our Flink environment’s checkpointing configurations to ensure that the Flink job will take incremental checkpoints. We’ll be analyzing these checkpoints later on.

Understanding the KafkaSource State

Before we inspect the checkpoints generated from our test Flink job, we first need to understand how the KafkaSource Flink operator saves its state.

As we’ve already mentioned, we’re using Flink’s KafkaSource to connect to our source Kafka data. Flink sources have 3 main components — Split, SourceReader, SplitEnumerator (Flink docs). A Split represents a portion of data that a source consumes and is the granularity that the source can parallelize reading data. For the KafkaSource, each Kafka partition corresponds to a separate Split, represented by the KafkaPartitionSplit class. The KafkaPartitionSplit is serialized by the KafkaPartitionSplitSerializer class. The logic for this serializer is pretty simple, it writes out a byte array of the Split’s topic, partition, and offset.

KafkaPartitionSplitSerializer’s serialize method:

@Override
public byte[] serialize(KafkaPartitionSplit split) throws IOException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos)) {
out.writeUTF(split.getTopic());
out.writeInt(split.getPartition());
out.writeLong(split.getStartingOffset());
out.writeLong(split.getStoppingOffset().orElse(KafkaPartitionSplit.NO_STOPPING_OFFSET));
out.flush();
return baos.toByteArray();
}
}

At runtime, Flink will instantiate all of the operators, including the SourceOperator objects. For each stateful Flink operator, there is a name associated with each stateful object. In the case of a source operator, the name associated with the split states are defined by SPLIT_STATE_DESC.

static final ListStateDescriptor SPLITS_STATE_DESC =
new ListStateDescriptor("SourceReaderState", BytePrimitiveArraySerializer.INSTANCE);

We can inspect the SourceOperator class further to see where these split states are initialized, in the initializeState method.

SourceOperator’s initializeState method:

@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
final ListState rawState =
context.getOperatorStateStore().getListState(SPLITS_STATE_DESC);
readerState = new SimpleVersionedListState(rawState, splitSerializer);
}

The Flink state that source operators use is the SimpleVersionedListState, which uses the SimpleVersionedSerialization class. In the SimpleVersionedListState class, the serialize method calls the writeVersionAndSerialize method to ultimately serialize the state.

Finally, if we inspect the writeVersionAndSerialize method in the SimpleVersionedSerialization, we can see that before writing the actual data associated with our source operator, we first write out a few bytes for the serializer version and the data’s length.

SimpleVersionedSerialization’s writeVersionAndSerialize method:

public static  void writeVersionAndSerialize(
SimpleVersionedSerializer serializer, T datum, DataOutputView out)
throws IOException {
checkNotNull(serializer, "serializer");
checkNotNull(datum, "datum");
checkNotNull(out, "out");

final byte[] data = serializer.serialize(datum);

out.writeInt(serializer.getVersion());
out.writeInt(data.length);
out.write(data);
}

Let’s quickly recap the important parts from above:

  1. The KafkaSource operator stores its state in KafkaPartitionSplit objects.
  2. The KafkaPartitionSplit keeps track of the current topic, partition, and offset that the KafkaSource has last processed.
  3. When Flink savepointing/checkpointing occurs, a byte array representing the KafkaSource state gets written to the state snapshot. The byte array has a header which includes the serializer version and the length of data. Then the actual state data, which is a serialized version of the KafkaPartitionSplit, makes up the rest of the state byte array.

Now that we have some idea of how data is being serialized into Flink savepoints and checkpoints, let’s see how we can use the State Processor API to extract the Kafka source operator information from these state snapshots.

State Processor API to Inspect Kafka Source State

For maven projects, you can add the following dependency to your pom.xml file to start using the Flink State Processor API.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-state-processor-api</artifactId>
<version>1.18.0</version>
</dependency>

The following class showcases the full example of how we can use the State Processor API to read KafkaSource offsets from a Flink savepoint or checkpoint.

public class StateProcessorTest {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String savepointPath = Path.of("/tmp/checkpoints/609bc335486ca6cfcc8692e4c1ff8782/chk-8").toString();
SavepointReader savepoint = SavepointReader.read(env, savepointPath, new HashMapStateBackend());
DataStream listState = savepoint.readListState(
OperatorIdentifier.forUid("kafkasourceuid"),
"SourceReaderState",
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
CloseableIterator states = listState.executeAndCollect();
while (states.hasNext()) {
byte[] s = states.next();
KafkaPartitionSplitSerializer serializer = new KafkaPartitionSplitSerializer();
KafkaPartitionSplit split = serializer.deserialize(serializer.getVersion(), Arrays.copyOfRange(s, 8, s.length));
System.out.println(
String.format("topic=%s, partition=%s, startingOffset=%s, stoppingOffset=%s, topicPartition=%s",
split.getTopic(), split.getPartition(),
split.getStartingOffset(), split.getStoppingOffset(), split.getTopicPartition()));
}

System.out.println("DONE");
}
}

First, we’ll load the savepoint. The SavepointReader class from the State Processor API allows us to load a full savepoint or checkpoint. On line 7, we are loading a checkpoint that was created in “/tmp/checkpoints” as a result of running the test Flink job. As we mentioned in the previous section, the source operators use a SimpleVersionedListState, which the SavepointReader can read using the readListState method. When reading the list states, we need to know 3 things:

  1. Operator ID:“kafkasourceuid” set in our test Flink job
  2. State Name:“SourceReaderState” set in Flink’s SourceOperator class
  3. State TypeInformation:PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO set in Flink’s SourceOperator class

After we get our list states, we can simply iterate through each of the states, which are given as byte arrays. Since the SimpleVersionedSerialization serializer first writes the version and data length, which we don’t care about, we need to skip those headers. You’ll see on line 16 that we deserialize the byte array as a KafkaPartitionSplit after skipping the first 8 bytes of the state byte array.

Running the above code example gives the following result:

topic=source, partition=0, startingOffset=3, stoppingOffset=Optional.empty, topicPartition=source-0
DONE

Conclusion

In this post, we explained how Flink’s KafkaSource state is serialized into savepoints and covered an example of reading this state with the State Processor API. Flink’s State Processor API can be a powerful tool to analyze and modify Flink savepoints and checkpoints. However, it can be confusing for beginners to use and requires some in-depth knowledge about how the Flink operators manage their individual states. Hopefully this guide will help you understand the KafkaSource and serve as a good tutorial for getting started with the State Processor API.

For more content about Flink and stream processing, check out more content from DeltaStream’s blog. DeltaStream is a platform that simplifies the unification, processing, and governance of streaming data.

Resources:

--

--

DeltaStream
DeltaStream

Written by DeltaStream

DeltaStream provides a unified serverless platform to manage, secure and process all your streams - powered by Apache Flink.

No responses yet