Storm source code analysis (8)

null_ wry 2022-02-13 07:59:11 阅读数:851

storm source code analysis


[email protected]

WorkerTransfer

tryTransferRemote()

public boolean tryTransferRemote(AddressedTuple addressedTuple, Queue<AddressedTuple> pendingEmits, ITupleSerializer serializer) {

if (pendingEmits != null && !pendingEmits.isEmpty()) {

pendingEmits.add(addressedTuple);
return false;
}
if (!remoteBackPressureStatus[addressedTuple.dest].get()) {

TaskMessage tm = new TaskMessage(addressedTuple.getDest(), serializer.serialize(addressedTuple.getTuple()));
if (transferQueue.tryPublish(tm)) {

return true;
}
} else {

LOG.debug("Noticed Back Pressure in remote task {}", addressedTuple.dest);
}
if (pendingEmits != null) {

pendingEmits.add(addressedTuple);
}
return false;
}

This is not a blocking call . If not , Will add ’tuple’ To ’ pendingemit ‘ And back to ’false’.’ pendingemit ' Can be null .
Here you can see how to create TaskMessage When , Use serializer.serialize(addressedTuple.getTuple()) Yes tuple Serialized ; The serializer by ITupleSerializer type , Its implementation class is KryoTupleSerializer.

WorkerTokenInfo

_Fields

public enum _Fields implements org.apache.storm.thrift.TFieldIdEnum {

USER_NAME((short)1, "userName"),
TOPOLOGY_ID((short)2, "topologyId"),
SECRET_VERSION((short)3, "secretVersion"),
EXPIRATION_TIME_MILLIS((short)4, "expirationTimeMillis");
private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
static {

for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {

byName.put(field.getFieldName(), field);
}
}

The structure contains a set of fields , And convenient ways to find and operate them .

findByThriftId

@org.apache.storm.thrift.annotation.Nullable
public static _Fields findByThriftId(int fieldId) {

switch(fieldId) {

case 1: // USER_NAME
return USER_NAME;
case 2: // TOPOLOGY_ID
return TOPOLOGY_ID;
case 3: // SECRET_VERSION
return SECRET_VERSION;
case 4: // EXPIRATION_TIME_MILLIS
return EXPIRATION_TIME_MILLIS;
default:
return null;
}
}

Find a match fieldId Of _Fields Constant , If not found, it is null.

findByThriftIdOrThrow

public static _Fields findByThriftIdOrThrow(int fieldId) {

_Fields fields = findByThriftId(fieldId);
if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
return fields;
}

Find a match fieldId Of _Fields Constant , If not found , Throw an exception .

findByName

 @org.apache.storm.thrift.annotation.Nullable
public static _Fields findByName(java.lang.String name) {

return byName.get(name);
}
private final short _thriftId;
private final java.lang.String _fieldName;
_Fields(short thriftId, java.lang.String fieldName) {

_thriftId = thriftId;
_fieldName = fieldName;
}
public short getThriftFieldId() {

return _thriftId;
}
public java.lang.String getFieldName() {

return _fieldName;
}
}

Find and name Matching _Fields Constant , If not found, it is null.

copyright:author[null_ wry],Please bring the original link to reprint, thank you. https://en.javamana.com/2022/02/202202130759097277.html