Skip to content

graflo.architecture.pipeline.runtime.actor.edge_render

Edge creation and weight management for graph assembly.

add_blank_collections(ctx, vertex_conf)

Add blank collections for vertices that require them.

Source code in graflo/architecture/pipeline/runtime/actor/edge_render.py
def add_blank_collections(
    ctx: AssemblyContext | ActionContext, vertex_conf: VertexConfig
) -> AssemblyContext | ActionContext:
    """Add blank collections for vertices that require them."""
    buffer_transforms = [
        item for sublist in ctx.buffer_transforms.values() for item in sublist
    ]
    for vname in vertex_conf.blank_vertices:
        v = vertex_conf[vname]
        for item in buffer_transforms:
            prep_doc = {f: item[f] for f in v.field_names if f in item}
            if vname not in ctx.acc_global:
                ctx.acc_global[vname] = [prep_doc]
    return ctx

count_unique_by_position_variable(tuples_list, fillvalue=None)

For each position in the tuples, returns the number of different elements.

Source code in graflo/architecture/pipeline/runtime/actor/edge_render.py
def count_unique_by_position_variable(tuples_list: list, fillvalue: Any = None) -> list:
    """For each position in the tuples, returns the number of different elements."""
    if not tuples_list:
        return []
    transposed = zip_longest(*tuples_list, fillvalue=fillvalue)
    return [len(set(position)) for position in transposed]

filter_nonindexed(items_tdressed, index)

Filter items to only include those with indexed fields.

Source code in graflo/architecture/pipeline/runtime/actor/edge_render.py
def filter_nonindexed(
    items_tdressed: defaultdict[LocationIndex, list[tuple[VertexRep, dict]]],
    index: Any,
) -> defaultdict[LocationIndex, list[tuple[VertexRep, dict]]]:
    """Filter items to only include those with indexed fields."""
    for va, vlist in items_tdressed.items():
        items_tdressed[va] = [
            item for item in vlist if any(k in item[0].vertex for k in index)
        ]
    return items_tdressed

render_edge(edge, vertex_config, ctx, lindex=None)

Create edges between source and target vertices.

Source code in graflo/architecture/pipeline/runtime/actor/edge_render.py
def render_edge(
    edge: Edge,
    vertex_config: VertexConfig,
    ctx: AssemblyContext | ActionContext,
    lindex: LocationIndex | None = None,
) -> defaultdict[str | None, list]:
    """Create edges between source and target vertices."""
    acc_vertex = ctx.acc_vertex
    buffer_transforms = ctx.buffer_transforms
    source = edge.source
    target = edge.target

    source_identity = vertex_config.identity_fields(source)
    target_identity = vertex_config.identity_fields(target)

    source_by_loc = acc_vertex[source]
    target_by_loc = acc_vertex[target]
    if not source_by_loc or not target_by_loc:
        return defaultdict(list)

    source_locs = list(source_by_loc)
    target_locs = list(target_by_loc)

    if lindex is not None:
        source_locs = sorted(lindex.filter(source_locs))
        target_locs = sorted(lindex.filter(target_locs))

    source_locs, target_locs = _filter_source_target_lindexes(
        edge, source_locs, target_locs
    )

    if not (source_locs and target_locs):
        return defaultdict(list)

    source_by_loc = defaultdict(list, {loc: source_by_loc[loc] for loc in source_locs})
    target_by_loc = defaultdict(list, {loc: target_by_loc[loc] for loc in target_locs})

    source_min_depth = min(loc.depth() for loc in source_by_loc)
    target_min_depth = min(loc.depth() for loc in target_by_loc)

    source_dressed = dress_vertices(source_by_loc, buffer_transforms)
    target_dressed = dress_vertices(target_by_loc, buffer_transforms)
    source_dressed = filter_nonindexed(source_dressed, source_identity)
    target_dressed = filter_nonindexed(target_dressed, target_identity)

    edges: defaultdict[str | None, list] = defaultdict(list)

    if source == target and source_locs is target_locs:
        path_spec = count_unique_by_position_variable([loc.path for loc in source_locs])
        source_path_spec = target_path_spec = path_spec
    else:
        source_path_spec = count_unique_by_position_variable(
            [loc.path for loc in source_locs]
        )
        target_path_spec = count_unique_by_position_variable(
            [loc.path for loc in target_locs]
        )

    source_groups, target_groups = _compute_location_groups(
        source_locs, target_locs, source_path_spec, target_path_spec
    )

    for source_group, target_group in _iter_emitter_receiver_group_pairs(
        source_groups, target_groups, edge, source, target
    ):
        for source_loc in source_group:
            source_items = source_dressed[source_loc]
            for target_loc in target_group:
                target_items = target_dressed[target_loc]

                casting = _choose_casting(
                    source_loc,
                    target_loc,
                    source,
                    target,
                )
                iterator = select_iterator(casting)

                for (u_rep, u_tr), (v_rep, v_tr) in iterator(
                    source_items, target_items
                ):
                    u_doc = u_rep.vertex
                    v_doc = v_rep.vertex

                    weight: dict[str, Any] = {}
                    if edge.weights is not None:
                        for field in edge.weights.direct:
                            field_name = field.name
                            if field in u_rep.ctx:
                                weight[field_name] = u_rep.ctx[field]
                            if field in v_rep.ctx:
                                weight[field_name] = v_rep.ctx[field]
                            if field in u_tr:
                                weight[field_name] = u_tr[field]
                            if field in v_tr:
                                weight[field_name] = v_tr[field]

                    source_proj = project_dict(u_doc, source_identity)
                    target_proj = project_dict(v_doc, target_identity)

                    extracted_relation = None

                    if edge.relation_field is not None:
                        u_relation = u_rep.ctx.pop(edge.relation_field, None)
                        if u_relation is None:
                            v_relation = v_rep.ctx.pop(edge.relation_field, None)
                            if v_relation is not None:
                                source_proj, target_proj = target_proj, source_proj
                                extracted_relation = v_relation
                        else:
                            extracted_relation = u_relation

                    if (
                        extracted_relation is None
                        and edge.relation_from_key
                        and len(target_loc) > 1
                    ):
                        extracted_relation = _extract_relation_from_key(
                            source_loc, target_loc, source_min_depth, target_min_depth
                        )

                    if edge.relation_from_key and extracted_relation is None:
                        continue

                    relation = (
                        extracted_relation
                        if extracted_relation is not None
                        else edge.relation
                    )
                    edges[relation].append((source_proj, target_proj, weight))
    return edges

render_weights(edge, vertex_config, acc_vertex, edges)

Process and apply weights to edge documents.

Source code in graflo/architecture/pipeline/runtime/actor/edge_render.py
def render_weights(
    edge: Edge,
    vertex_config: VertexConfig,
    acc_vertex: defaultdict[str, defaultdict[LocationIndex, list]],
    edges: defaultdict[str | None, list],
) -> defaultdict[str | None, list]:
    """Process and apply weights to edge documents."""
    vertex_weights = [] if edge.weights is None else edge.weights.vertices
    weights: list = []

    for w in vertex_weights:
        vertex = w.name
        if vertex is None or vertex not in vertex_config.vertex_set:
            continue
        vertex_lists = acc_vertex[vertex]

        keys = sorted(vertex_lists)
        if not keys:
            continue
        vertex_sample = [item.vertex for item in vertex_lists[keys[0]]]

        if w.filter:
            vertex_sample = [
                doc
                for doc in vertex_sample
                if all([doc[q] == v in doc for q, v in w.filter.items()])
            ]
        if vertex_sample:
            for doc in vertex_sample:
                weight = {}
                if w.fields:
                    weight = {
                        **{
                            w.cfield(field): doc[field]
                            for field in w.fields
                            if field in doc
                        },
                    }
                if w.map:
                    weight = {
                        **weight,
                        **{q: doc[k] for k, q in w.map.items()},
                    }
                if not w.fields and not w.map:
                    try:
                        weight = {
                            f"{vertex}.{k}": doc[k]
                            for k in vertex_config.identity_fields(vertex)
                            if k in doc
                        }
                    except ValueError:
                        weight = {}
                        logger.error(
                            " weights mapper error : weight definition on"
                            f" {edge.source} {edge.target} refers to"
                            f" a non existent vcollection {vertex}"
                        )
                weights += [weight]
    if weights:
        for r, edocs in edges.items():
            edges[r] = [
                (u, v, {**w, **weight}) for (u, v, w), weight in zip(edocs, weights)
            ]
    return edges