Skip to content

Commit

Permalink
* Refactored Document and TimingMetadata to not extend Attribute
Browse files Browse the repository at this point in the history
* Simplified any logic looking for nested documents
* Ensure everything is actually serializable
* Made the projection testing a little more robust
* Updated to avoid creating a copy of the document
  • Loading branch information
ivakegg committed Nov 22, 2023
1 parent 6d6ed09 commit e46c7a1
Show file tree
Hide file tree
Showing 17 changed files with 639 additions and 537 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.log4j.Logger;
Expand All @@ -26,136 +25,63 @@
public abstract class Attribute<T extends Comparable<T>> implements WritableComparable<T>, KryoSerializable {

private static final Logger log = Logger.getLogger(Attribute.class);
private static final Text EMPTY_TEXT = new Text();

/**
* The metadata for this attribute. Really only the column visibility and timestamp are preserved in this metadata when serializing and deserializing.
* However more information (e.g. the document key) can be maintained in this field for use locally.
*/
protected Key metadata = null;
protected AttributeMetadata metadata = new AttributeMetadata();
protected boolean toKeep = true; // a flag denoting whether this attribute is to be kept in the returned results (transient or not)
protected boolean fromIndex = true; // Assume attributes are from the index unless specified otherwise.

public Attribute() {}

public Attribute(Key metadata, boolean toKeep) {
public Attribute(boolean toKeep) {
this.toKeep = toKeep;
setMetadata(metadata);
}

public boolean isMetadataSet() {
return (metadata != null);
public Attribute(Key key, boolean toKeep) {
this.toKeep = toKeep;
metadata.setMetadata(key);
}

public ColumnVisibility getColumnVisibility() {
if (isMetadataSet()) {
return metadata.getColumnVisibilityParsed();
}
return Constants.EMPTY_VISIBILITY;
public boolean isMetadataSet() {
return metadata.isMetadataSet();
}

public void setColumnVisibility(ColumnVisibility columnVisibility) {
if (isMetadataSet()) {
metadata = new Key(metadata.getRow(), metadata.getColumnFamily(), metadata.getColumnQualifier(), columnVisibility, metadata.getTimestamp());
} else {
metadata = new Key(EMPTY_TEXT, EMPTY_TEXT, EMPTY_TEXT, columnVisibility, -1);
}
public Key getMetadata() {
return metadata.getMetadata();
}

public long getTimestamp() {
if (isMetadataSet()) {
return metadata.getTimestamp();
}
return -1;
public void setMetadata(Key key) {
metadata.setMetadata(key);
}

public void setTimestamp(long ts) {
if (isMetadataSet()) {
metadata = new Key(metadata.getRow(), metadata.getColumnFamily(), metadata.getColumnQualifier(), metadata.getColumnVisibility(), ts);
} else {
metadata = new Key(EMPTY_TEXT, EMPTY_TEXT, EMPTY_TEXT, Constants.EMPTY_VISIBILITY, ts);
}
public void setMetadata(ColumnVisibility vis, long ts) {
metadata.setMetadata(vis, ts);
}

/*
*
* Set the metadata. This should only be set here or from extended classes.
*/
protected void setMetadata(ColumnVisibility vis, long ts) {
if (isMetadataSet()) {
metadata = new Key(metadata.getRow(), metadata.getColumnFamily(), metadata.getColumnQualifier(), vis, ts);
} else {
metadata = new Key(EMPTY_TEXT, EMPTY_TEXT, EMPTY_TEXT, vis, ts);
}
}

private static final ByteSequence EMPTY_BYTE_SEQUENCE = new ArrayByteSequence(new byte[0]);

/*
* Given a key, set the metadata. Expected input keys can be an event key, an fi key, or a tf key. Expected metadata is row=shardid, cf = type\0uid; cq =
* empty; cv, ts left as is.
*/
protected void setMetadata(Key key) {
if (key == null) {
this.metadata = null;
} else {
// convert the key to the form shard type\0uid cv, ts. Possible inputs are an event key, a fi key, or a tf key
final ByteSequence row = key.getRowData(), cf = key.getColumnFamilyData(), cv = key.getColumnVisibilityData();
if (isFieldIndex(cf)) {
// find the first null byte in the cq and take everything after that (cq = Normalized Field Value\0Data Type\0UID)
final ByteSequence cq = key.getColumnQualifierData();
int nullOffset = 0;
for (int i = 0; i < cq.length(); i++) {
if (cq.byteAt(i) == '\0') {
nullOffset = i;
break;
}
}
this.metadata = new Key(row.getBackingArray(), row.offset(), row.length(), cq.getBackingArray(), nullOffset + 1, cq.length() - (nullOffset + 1),
EMPTY_BYTE_SEQUENCE.getBackingArray(), EMPTY_BYTE_SEQUENCE.offset(), EMPTY_BYTE_SEQUENCE.length(), cv.getBackingArray(),
cv.offset(), cv.length(), key.getTimestamp());
} else if (isTermFrequency(cf)) {
// find the second null byte in the cq and take everything before that (cq = DataType\0UID\0Normalized Field Value\0Field Name)
final ByteSequence cq = key.getColumnQualifierData();
int nullOffset = 0;
int count = 0;
for (int i = 0; i < cf.length(); i++) {
if (cf.byteAt(i) == '\0') {
count++;
if (count == 2) {
nullOffset = i;
break;
}
}
}
this.metadata = new Key(row.getBackingArray(), row.offset(), row.length(), cq.getBackingArray(), cq.offset(), nullOffset,
EMPTY_BYTE_SEQUENCE.getBackingArray(), EMPTY_BYTE_SEQUENCE.offset(), EMPTY_BYTE_SEQUENCE.length(), cv.getBackingArray(),
cv.offset(), cv.length(), key.getTimestamp());
} else {
this.metadata = new Key(row.getBackingArray(), row.offset(), row.length(), cf.getBackingArray(), cf.offset(), cf.length(),
EMPTY_BYTE_SEQUENCE.getBackingArray(), EMPTY_BYTE_SEQUENCE.offset(), EMPTY_BYTE_SEQUENCE.length(), cv.getBackingArray(),
cv.offset(), cv.length(), key.getTimestamp());
}
}
public ColumnVisibility getColumnVisibility() {
return metadata.getColumnVisibility();
}

protected boolean isFieldIndex(ByteSequence cf) {
return (cf.length() >= 3 && cf.byteAt(0) == 'f' && cf.byteAt(1) == 'i' && cf.byteAt(2) == '\0');
public void setColumnVisibility(ColumnVisibility vis) {
metadata.setColumnVisibility(vis);
}

protected boolean isTermFrequency(ByteSequence cf) {
return (cf.length() == 2 && cf.byteAt(0) == 't' && cf.byteAt(1) == 'f');
public long getTimestamp() {
return metadata.getTimestamp();
}

public Key getMetadata() {
return metadata;
public void setTimestamp(long ts) {
metadata.setTimestamp(ts);
}

/**
* Unset the metadata. This should only be set here or from extended classes.
*/
protected void clearMetadata() {
metadata = null;
metadata.setMetadata(null);
}

protected void writeMetadata(DataOutput out, Boolean reducedResponse) throws IOException {
Expand Down Expand Up @@ -249,17 +175,7 @@ public int size() {
}

private long getMetadataSizeInBytes() {
long size = 0;
if (isMetadataSet()) {
size += roundUp(33);
// 33 is object overhead, 4 array refs, 1 long and 1 boolean
size += roundUp(metadata.getRowData().length() + 12);
size += roundUp(metadata.getColumnFamilyData().length() + 12);
size += roundUp(metadata.getColumnQualifierData().length() + 12);
size += roundUp(metadata.getColumnVisibilityData().length() + 12);
// 12 is array overhead
}
return size;
return metadata.getSizeInBytes();
}

public long sizeInBytes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,53 +3,56 @@
import java.io.Serializable;
import java.util.Collection;

import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.log4j.Logger;

import com.google.common.collect.Sets;

import datawave.marking.MarkingFunctions;
import datawave.marking.MarkingFunctions.Exception;
import datawave.marking.MarkingFunctionsFactory;

public abstract class AttributeBag<T extends Comparable<T>> extends Attribute<T> implements Serializable {

private static final long serialVersionUID = 1L;
private static final Logger log = Logger.getLogger(AttributeBag.class);
protected long shardTimestamp = Long.MAX_VALUE;
protected boolean validMetadata = false;
public class AttributeBagMetadata extends AttributeMetadata implements Serializable {
private static final long serialVersionUID = -1;

private static final Logger log = Logger.getLogger(AttributeBagMetadata.class);
private static final long ONE_DAY_MS = 1000l * 60 * 60 * 24;

protected static final MarkingFunctions markingFunctions = MarkingFunctionsFactory.createMarkingFunctions();
private static final MarkingFunctions markingFunctions = MarkingFunctionsFactory.createMarkingFunctions();

public MarkingFunctions getMarkingFunctions() {
return markingFunctions;
}
private long shardTimestamp = Long.MAX_VALUE;

protected AttributeBag() {
this(true);
public interface AttributesGetter {
Collection<Attribute<? extends Comparable<?>>> getAttributes();
}

public AttributeBag(boolean toKeep) {
super(null, toKeep);
private final AttributesGetter attributes;

private boolean validMetadata = false;

public AttributeBagMetadata(AttributesGetter attributes) {
this.attributes = attributes;
}

public AttributeBag(Key metadata, boolean toKeep) {
super(metadata, toKeep);
public boolean isValidMetadata() {
return (this.validMetadata && isMetadataSet());
}

public void invalidateMetadata() {
this.validMetadata = false;
setValidMetadata(false);
}

public boolean isValidMetadata() {
return (this.validMetadata && isMetadataSet());
public void setValidMetadata(boolean valid) {
this.validMetadata = valid;
}

public long getShardTimestamp() {
return shardTimestamp;
}

public abstract Collection<Attribute<? extends Comparable<?>>> getAttributes();
public void setShardTimestamp(long shardTimestamp) {
this.shardTimestamp = shardTimestamp;
}

@Override
public long getTimestamp() {
Expand All @@ -59,7 +62,6 @@ public long getTimestamp() {
return super.getTimestamp();
}

@Override
public ColumnVisibility getColumnVisibility() {
if (isValidMetadata() == false)
this.updateMetadata();
Expand All @@ -70,35 +72,35 @@ private void updateMetadata() {
long ts = updateTimestamps();
ColumnVisibility vis = super.getColumnVisibility();
try {
vis = this.combineAndSetColumnVisibilities(getAttributes());
} catch (Exception e) {
vis = this.combineAndSetColumnVisibilities(attributes.getAttributes());
} catch (MarkingFunctions.Exception e) {
log.error("got error combining visibilities", e);
}
setMetadata(vis, ts);
validMetadata = true;
setValidMetadata(true);
}

protected ColumnVisibility combineAndSetColumnVisibilities(Collection<Attribute<? extends Comparable<?>>> attributes) throws Exception {
protected ColumnVisibility combineAndSetColumnVisibilities(Collection<Attribute<? extends Comparable<?>>> attributes) throws MarkingFunctions.Exception {
Collection<ColumnVisibility> columnVisibilities = Sets.newHashSet();
for (Attribute<?> attr : attributes) {
columnVisibilities.add(attr.getColumnVisibility());
}
return AttributeBag.markingFunctions.combine(columnVisibilities);
return markingFunctions.combine(columnVisibilities);
}

private long updateTimestamps() {
MutableLong ts = new MutableLong(Long.MAX_VALUE);
for (Attribute<?> attribute : getAttributes()) {
for (Attribute<?> attribute : attributes.getAttributes()) {
mergeTimestamps(attribute, ts);
}
return ts.longValue();
}

private void mergeTimestamps(Attribute<?> other, MutableLong ts) {
// if this is a set of attributes, then examine each one. Note not recursing on a Document as it should have already applied the shard time.
if (other instanceof AttributeBag) {
if (other instanceof Attributes) {
// recurse on the sub attributes
for (Attribute<?> attribute : ((AttributeBag<?>) other).getAttributes()) {
for (Attribute<?> attribute : ((Attributes) other).getAttributes()) {
mergeTimestamps(attribute, ts);
}
} else if (other.isMetadataSet()) {
Expand Down Expand Up @@ -143,9 +145,4 @@ private void mergeTimestamps(Attribute<?> other, MutableLong ts) {
}
}

@Override
public void setToKeep(boolean toKeep) {
super.setToKeep(toKeep);
// do not change values of child attributes to avoid overriding earlier decisions
}
}
Loading

0 comments on commit e46c7a1

Please sign in to comment.