Skip to content

How to configure cron so file is not processed multiple times (duplicate file)  #91

Open
@tawabocah

Description

@tawabocah

This might same with #89 , but I've tried several cron expressions and nothing works. No change on offset.flush.internal.ms so it uses default 60000 ms (as stated here)

This might be bug on deletion process, since I've tried various cron expression :

  • every 30 sec
  • every 1 minute
  • every 5 minutes

Using kafka from docker : docker.io/bitnami/kafka:3 with kafka connect docker confluentinc/cp-kafka-connect:7.0.1

To replicate :

  1. create connector with following config:
{
    "name": "test-connector",
    "config": {
        "connector.class": "com.github.mmolimar.kafka.connect.fs.FsSourceConnector",
        "tasks.max": "1",
        "fs.uris": "file:///data/text-files",
        "topic": "t-fs-demo",
        "policy.class": "com.github.mmolimar.kafka.connect.fs.policy.CronPolicy",
        "policy.recursive": "true",
        "policy.regexp": ".*",
        "policy.batch_size": "0",
        "policy.cleanup": "delete",
        "file_reader.class": "com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader",
        "file_reader.batch_size": "0",
        "policy.cron.expression": "1 0/1 * ? * * *"
    }
}
  1. Put a file (txt) on fs.uri folder. The file only contains one line for testing.

Expected :

  • file content published to kafka
  • file deleted

Actual :

  • file content published to kafka multiple times
  • file deleted

So the kafka consumer produces like this:


Every minute

  • cron expression : 1 0/1 * ? * * * (once every minute) : file processed twice before deleted

Kafka consumer:

Struct{value=file 1 line 1}
Struct{value=file 1 line 1}

Kafka connect log

kafka-connect  | [2022-01-30 03:45:30,156] INFO [test-connector|task-0|offsets] WorkerSourceTask{id=test-connector-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:484)
kafka-connect  | [2022-01-30 03:46:01,035] INFO [test-connector|task-0] FsSourceTask Processing records for file [path = file:/data/text-files/test-1.txt, length = 13, blocks = [[offset = 0, length = 13, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask:93)
kafka-connect  | [2022-01-30 03:46:30,157] INFO [test-connector|task-0|offsets] WorkerSourceTask{id=test-connector-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:484)
kafka-connect  | [2022-01-30 03:47:01,042] INFO [test-connector|task-0] FsSourceTask Processing records for file [path = file:/data/text-files/test-1.txt, length = 13, blocks = [[offset = 0, length = 13, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask:93)
kafka-connect  | [2022-01-30 03:47:30,157] INFO [test-connector|task-0|offsets] WorkerSourceTask{id=test-connector-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:484)
kafka-connect  | [2022-01-30 03:48:01,023] INFO [test-connector|task-0] CronPolicy Deleting file [file:/data/text-files/test-1.txt] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.CronPolicy:294)

Every 30 second

  • cron expression : 0/30 * * ? * * * (every 30 second) : file processed 3 times before deleted

Kafka consumer

Struct{value=file 2 line 1}
Struct{value=file 2 line 1}
Struct{value=file 2 line 1}

Kafka connect log

kafka-connect  | [2022-01-30 03:50:00,028] INFO [test-connector|task-0] FsSourceTask Processing records for file [path = file:/data/text-files/test-2.txt, length = 13, blocks = [[offset = 0, length = 13, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask:93)
kafka-connect  | [2022-01-30 03:50:18,386] INFO [test-connector|task-0|offsets] WorkerSourceTask{id=test-connector-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:484)
kafka-connect  | [2022-01-30 03:50:30,048] INFO [test-connector|task-0] FsSourceTask Processing records for file [path = file:/data/text-files/test-2.txt, length = 13, blocks = [[offset = 0, length = 13, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask:93)
kafka-connect  | [2022-01-30 03:51:00,041] INFO [test-connector|task-0] FsSourceTask Processing records for file [path = file:/data/text-files/test-2.txt, length = 13, blocks = [[offset = 0, length = 13, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask:93)
kafka-connect  | [2022-01-30 03:51:18,386] INFO [test-connector|task-0|offsets] WorkerSourceTask{id=test-connector-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:484)
kafka-connect  | [2022-01-30 03:51:30,025] INFO [test-connector|task-0] CronPolicy Deleting file [file:/data/text-files/test-2.txt] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.CronPolicy:294)

Every 5 minutes

  • cron : 1 0/5 * ? * * * (every 5 minutes): file processed twice before deleted.

kafka consumer

Struct{value=file 3 line 1}
Struct{value=file 3 line 1}

kafka connect log

kafka-connect  | [2022-01-30 03:55:01,052] INFO [test-connector|task-0] FsSourceTask Processing records for file [path = file:/data/text-files/test-3.txt, length = 13, blocks = [[offset = 0, length = 13, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask:93)
kafka-connect  | [2022-01-30 03:55:43,908] INFO [test-connector|task-0|offsets] WorkerSourceTask{id=test-connector-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:484)
kafka-connect  | [2022-01-30 03:56:43,909] INFO [test-connector|task-0|offsets] WorkerSourceTask{id=test-connector-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:484)
kafka-connect  | [2022-01-30 03:57:43,910] INFO [test-connector|task-0|offsets] WorkerSourceTask{id=test-connector-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:484)
kafka-connect  | [2022-01-30 03:58:43,912] INFO [test-connector|task-0|offsets] WorkerSourceTask{id=test-connector-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:484)
kafka-connect  | [2022-01-30 03:59:43,913] INFO [test-connector|task-0|offsets] WorkerSourceTask{id=test-connector-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:484)
kafka-connect  | [2022-01-30 04:00:01,058] INFO [test-connector|task-0] FsSourceTask Processing records for file [path = file:/data/text-files/test-3.txt, length = 13, blocks = [[offset = 0, length = 13, corrupt = false]]]... (com.github.mmolimar.kafka.connect.fs.FsSourceTask:93)
kafka-connect  | [2022-01-30 04:00:43,914] INFO [test-connector|task-0|offsets] WorkerSourceTask{id=test-connector-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:484)
kafka-connect  | [2022-01-30 04:01:43,924] INFO [test-connector|task-0|offsets] WorkerSourceTask{id=test-connector-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:484)
kafka-connect  | [2022-01-30 04:02:43,924] INFO [test-connector|task-0|offsets] WorkerSourceTask{id=test-connector-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:484)
kafka-connect  | [2022-01-30 04:03:43,925] INFO [test-connector|task-0|offsets] WorkerSourceTask{id=test-connector-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:484)
kafka-connect  | [2022-01-30 04:04:43,927] INFO [test-connector|task-0|offsets] WorkerSourceTask{id=test-connector-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:484)
kafka-connect  | [2022-01-30 04:05:01,024] INFO [test-connector|task-0] CronPolicy Deleting file [file:/data/text-files/test-3.txt] due to it was already processed. (com.github.mmolimar.kafka.connect.fs.policy.CronPolicy:294)

No error / warn log on kafka broker

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions