package org.openimaj.rdf.storm.eddying.stems;

import backtype.storm.task.OutputCollector;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import com.hp.hpl.jena.graph.Graph;
import com.hp.hpl.jena.graph.Node;
import com.hp.hpl.jena.graph.compose.MultiUnion;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.openimaj.rdf.storm.eddying.routing.StormGraphRouter;
import org.openimaj.rdf.storm.eddying.stems.StormSteMBolt;
import org.openimaj.rdf.storm.topology.bolt.StormReteBolt;
import org.openimaj.rdf.storm.topology.logging.LoggerBolt;
import org.openimaj.rdf.storm.utils.CircularPriorityWindow;

/* loaded from: input_file:org/openimaj/rdf/storm/eddying/stems/StormSteMQueue.class */
public class StormSteMQueue implements CircularPriorityWindow.DurationOverflowHandler<Tuple> {
    protected static final Logger logger = Logger.getLogger(StormSteMQueue.class);
    private static final boolean logging = false;
    private LoggerBolt.LogEmitter logStream;
    private final CircularPriorityWindow<Tuple> window;
    private long timestampLimit;
    private int varCount;
    protected StormGraphRouter router;

    public StormSteMQueue(int i, int i2, long j, TimeUnit timeUnit, OutputCollector outputCollector) {
        this.varCount = i;
        this.window = new CircularPriorityWindow<>(this, i2, j, timeUnit);
        this.timestampLimit = Long.MAX_VALUE;
    }

    public void setGraphRouter(StormGraphRouter stormGraphRouter) {
        this.router = stormGraphRouter;
    }

    public StormSteMQueue(int i, int i2, long j, TimeUnit timeUnit, OutputCollector outputCollector, StormGraphRouter stormGraphRouter) {
        this(i, i2, j, timeUnit, outputCollector);
        setGraphRouter(stormGraphRouter);
    }

    public void build(Tuple tuple, boolean z, long j) {
        if (z) {
            this.window.offer((CircularPriorityWindow<Tuple>) tuple);
            if (j < this.timestampLimit) {
                this.timestampLimit = j;
            }
        } else {
            this.window.remove(tuple);
        }
        this.router.routeGraph(tuple, StormGraphRouter.Action.check, z, (Graph) tuple.getValueByField(StormSteMBolt.Component.graph.toString()), ((Long) tuple.getValueByField(StormSteMBolt.Component.timestamp.toString())).longValue());
    }

    public void check(Tuple tuple, boolean z, long j) {
        this.router.routeGraph(tuple, StormGraphRouter.Action.probe, z, (Graph) tuple.getValueByField(StormSteMBolt.Component.graph.toString()), j);
    }

    public void probe(Tuple tuple, boolean z, long j) {
        List values = tuple.getValues();
        logger.debug("\nChecking new tuple values: " + StormReteBolt.cleanString(values));
        logger.debug("\nComparing new tuple to " + this.window.size() + " other tuples");
        boolean z2 = false;
        Iterator<Tuple> it = this.window.iterator();
        while (it.hasNext()) {
            Tuple next = it.next();
            if (j < next.getLongByField(StormSteMBolt.Component.timestamp.toString()).longValue()) {
                break;
            }
            boolean z3 = true;
            int i = 0;
            while (true) {
                if (i < this.varCount) {
                    Node node = (Node) values.get(i);
                    if (!node.sameValueAs((Node) next.getValue(i)) && !node.isVariable()) {
                        z3 = false;
                        break;
                    }
                    i++;
                } else {
                    break;
                }
            }
            if (z3) {
                z2 = true;
                Graph joinSubGraphs = joinSubGraphs(tuple, next);
                logger.debug("\nMatch Found! preparing for emit!\n" + joinSubGraphs.toString());
                this.router.routeGraph(tuple, StormGraphRouter.Action.probe, z, joinSubGraphs, j, next.getLongByField(StormReteBolt.Component.timestamp.toString()).longValue());
            }
        }
        if (z2) {
            return;
        }
        logger.debug(String.format("\nCould not match partially complete graph: %s\nTook %s milliseconds.", tuple.getValueByField(StormReteBolt.Component.graph.toString()).toString(), Long.valueOf(new Date().getTime() - j)));
    }

    protected static Graph joinSubGraphs(Tuple tuple, Tuple tuple2) {
        MultiUnion multiUnion = new MultiUnion();
        multiUnion.addGraph((Graph) tuple.getValueByField(StormReteBolt.Component.graph.toString()));
        multiUnion.addGraph((Graph) tuple2.getValueByField(StormReteBolt.Component.graph.toString()));
        return multiUnion;
    }

    @Override // org.openimaj.rdf.storm.utils.CircularPriorityWindow.OverflowHandler
    public void handleCapacityOverflow(Tuple tuple) {
        logger.debug("Window capacity exceeded.");
    }

    @Override // org.openimaj.rdf.storm.utils.CircularPriorityWindow.DurationOverflowHandler
    public void handleDurationOverflow(Tuple tuple) {
        logger.debug("Tuple exceeded age of window.");
        new Values().addAll(tuple.getValues());
    }
}
