package org.openimaj.rdf.storm.sparql.topology.bolt;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
import com.hp.hpl.jena.graph.Triple;
import com.hp.hpl.jena.query.ARQ;
import com.hp.hpl.jena.query.Query;
import com.hp.hpl.jena.reasoner.rulesys.impl.RETERuleContext;
import com.hp.hpl.jena.sparql.core.DatasetGraphMap;
import com.hp.hpl.jena.sparql.core.VarExprList;
import com.hp.hpl.jena.sparql.engine.ExecutionContext;
import com.hp.hpl.jena.sparql.engine.QueryIterator;
import com.hp.hpl.jena.sparql.engine.binding.Binding;
import com.hp.hpl.jena.sparql.engine.iterator.QueryIterGroup;
import com.hp.hpl.jena.sparql.engine.iterator.QueryIterPlainWrapper;
import com.hp.hpl.jena.sparql.expr.Expr;
import com.hp.hpl.jena.sparql.expr.ExprAggregator;
import com.hp.hpl.jena.sparql.function.FunctionEnvBase;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.mortbay.io.RuntimeIOException;
import org.openimaj.io.IOUtils;
import org.openimaj.rdf.storm.bolt.RETEStormNode;
import org.openimaj.rdf.storm.utils.CircularPriorityWindow;

/* loaded from: input_file:org/openimaj/rdf/storm/sparql/topology/bolt/StormSPARQLReteConflictSetBolt.class */
public abstract class StormSPARQLReteConflictSetBolt extends StormSPARQLReteBolt {
    private static Logger logger = Logger.getLogger(StormSPARQLReteConflictSetBolt.class);
    private static final long serialVersionUID = 5248125498316607622L;
    private List<ExprAggregator> aggregators;
    private List<Expr> having;
    private VarExprList groupBy;
    private Collection<Binding> bindingsQueue;
    private FunctionEnvBase execCxt;
    private StormSPARQLReteConflictSetBoltSink sink;
    private byte[] sinkBytes;

    /* loaded from: input_file:org/openimaj/rdf/storm/sparql/topology/bolt/StormSPARQLReteConflictSetBolt$StormSPARQLReteConflictSetBoltSink.class */
    public interface StormSPARQLReteConflictSetBoltSink {
        void instantiate(StormSPARQLReteConflictSetBolt stormSPARQLReteConflictSetBolt);

        void consumeTriple(Triple triple);

        void consumeBindings(QueryIterator queryIterator);

        void close();
    }

    public StormSPARQLReteConflictSetBolt(Query query) {
        super(query);
    }

    @Override // org.openimaj.rdf.storm.bolt.RETEStormNode
    public RETEStormNode clone(Map<RETEStormNode, RETEStormNode> map, RETERuleContext rETERuleContext) {
        return null;
    }

    public void execute(Tuple tuple) {
        QueryIterGroup queryIterPlainWrapper;
        logger.debug("Conflict set Tuple: " + tuple);
        this.execCxt = new FunctionEnvBase(ARQ.getContext(), extractGraph(tuple), new DatasetGraphMap(extractGraph(tuple)));
        Binding tupleToBinding = tupleToBinding(tuple);
        logger.debug("Conflict set extracted binding: " + tupleToBinding);
        if (this.aggregators.isEmpty()) {
            queryIterPlainWrapper = new QueryIterPlainWrapper(Arrays.asList(tupleToBinding).iterator());
        } else {
            updateBindings(tupleToBinding);
            queryIterPlainWrapper = updateAggregators();
        }
        if (!this.having.isEmpty()) {
            queryIterPlainWrapper = checkHaving(queryIterPlainWrapper);
        }
        handleBinding(queryIterPlainWrapper);
        acknowledge(tuple);
    }

    private QueryIterator checkHaving(QueryIterator queryIterator) {
        ArrayList arrayList = new ArrayList();
        while (queryIterator.hasNext()) {
            Binding binding = (Binding) queryIterator.next();
            Iterator<Expr> it = this.having.iterator();
            while (it.hasNext()) {
                if (it.next().isSatisfied(binding, this.execCxt)) {
                    arrayList.add(binding);
                }
            }
        }
        return new QueryIterPlainWrapper(arrayList.iterator());
    }

    private void updateBindings(Binding binding) {
        this.bindingsQueue.add(binding);
    }

    public abstract void handleBinding(QueryIterator queryIterator);

    @Override // org.openimaj.rdf.storm.topology.bolt.StormReteBolt
    public void prepare() {
        this.bindingsQueue = new CircularPriorityWindow(new CircularPriorityWindow.OverflowHandler<Binding>() { // from class: org.openimaj.rdf.storm.sparql.topology.bolt.StormSPARQLReteConflictSetBolt.1
            @Override // org.openimaj.rdf.storm.utils.CircularPriorityWindow.OverflowHandler
            public void handleCapacityOverflow(Binding binding) {
                StormSPARQLReteConflictSetBolt.this.logStream.emit("Binding overflowing! Binding removed", binding);
                StormSPARQLReteConflictSetBolt.logger.debug("Binding overflowing! Binding removed");
            }
        }, 5000, 36000L, TimeUnit.SECONDS);
        Query query = getQuery();
        this.aggregators = query.getAggregators();
        this.having = query.getHavingExprs();
        this.groupBy = query.getGroupBy();
        try {
            this.sink = (StormSPARQLReteConflictSetBoltSink) IOUtils.read(new DataInputStream(new ByteArrayInputStream(this.sinkBytes)));
            this.sink.instantiate(this);
        } catch (IOException e) {
            throw new RuntimeIOException(e);
        }
    }

    protected QueryIterGroup updateAggregators() {
        return new QueryIterGroup(new QueryIterPlainWrapper(this.bindingsQueue.iterator()), this.groupBy, this.aggregators, (ExecutionContext) null);
    }

    public static StormSPARQLReteConflictSetBolt construct(Query query, StormSPARQLReteConflictSetBoltSink stormSPARQLReteConflictSetBoltSink) {
        StormSPARQLReteConflictSetBolt stormSPARQLReteConflictSetBolt = null;
        if (query.isSelectType()) {
            stormSPARQLReteConflictSetBolt = new StormSPARQLReteSelectConflictSetBolt(query);
        } else if (query.isConstructType()) {
            stormSPARQLReteConflictSetBolt = new StormSPARQLReteConstructConflictSetBolt(query);
        }
        stormSPARQLReteConflictSetBolt.setSink(stormSPARQLReteConflictSetBoltSink);
        return stormSPARQLReteConflictSetBolt;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setSink(StormSPARQLReteConflictSetBoltSink stormSPARQLReteConflictSetBoltSink) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try {
            IOUtils.write(stormSPARQLReteConflictSetBoltSink, new DataOutputStream(byteArrayOutputStream));
            this.sinkBytes = byteArrayOutputStream.toByteArray();
        } catch (IOException e) {
            throw new RuntimeIOException(e);
        }
    }

    public void emitTriple(Triple triple) {
        logger.debug("Emitting triple: " + triple);
        this.sink.consumeTriple(triple);
    }

    public void emitSolutions(QueryIterator queryIterator) {
        logger.debug("Emitting iterator solutions!");
        this.sink.consumeBindings(queryIterator);
    }

    public OutputCollector getCollector() {
        return this.collector;
    }

    public TopologyContext getContext() {
        return this.context;
    }

    public void cleanup() {
        super.cleanup();
        this.sink.close();
    }
}
