/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

public class TimestampsAndPunctuatedWatermarksOperator<T>
extends AbstractUdfStreamOperator<T, AssignerWithPunctuatedWatermarks<T>>
implements OneInputStreamOperator<T, T> {
    private static final long serialVersionUID = 1L;
    private long currentWatermark = Long.MIN_VALUE;

    public TimestampsAndPunctuatedWatermarksOperator(AssignerWithPunctuatedWatermarks<T> assigner) {
        super(assigner);
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<T> element) throws Exception {
        T value = element.getValue();
        long newTimestamp = ((AssignerWithPunctuatedWatermarks)this.userFunction).extractTimestamp(value, element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
        this.output.collect(element.replace(element.getValue(), newTimestamp));
        Watermark nextWatermark = ((AssignerWithPunctuatedWatermarks)this.userFunction).checkAndGetNextWatermark(value, newTimestamp);
        if (nextWatermark != null && nextWatermark.getTimestamp() > this.currentWatermark) {
            this.currentWatermark = nextWatermark.getTimestamp();
            this.output.emitWatermark(nextWatermark);
        }
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        if (mark.getTimestamp() == Long.MAX_VALUE && this.currentWatermark != Long.MAX_VALUE) {
            this.currentWatermark = Long.MAX_VALUE;
            this.output.emitWatermark(mark);
        }
    }
}

