Skip to content

Commit

Permalink
feat: add the pending condition and status (#244)
Browse files Browse the repository at this point in the history
* add the pending condition and status

* fix test and address comments

Co-authored-by: Ryan Zhang <[email protected]>
  • Loading branch information
ryanzhang-oss and Ryan Zhang committed Aug 23, 2022
1 parent 0031f58 commit cd3bccb
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 37 deletions.
84 changes: 62 additions & 22 deletions pkg/controllers/clusterresourceplacement/placement_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"time"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -28,13 +29,14 @@ import (
)

const (
eventReasonResourceScheduled = "ResourceScheduled"
eventReasonResourceApplied = "ResourceApplied"
ApplyFailedReason = "ApplyFailed"
ApplyPendingReason = "ApplyPending"
ApplySucceededReason = "ApplySucceeded"
)

var (
ErrStillPendingManifest = fmt.Errorf("there are still manifest pending to be processed by the member cluster")
ErrFailedManifest = fmt.Errorf("there are still failed to apply manifests")
ErrFailedManifest = fmt.Errorf("there are failed to apply manifests, please check the `failedResourcePlacements` status")
)

// Reconciler reconciles a cluster resource placement object
Expand Down Expand Up @@ -87,7 +89,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ct
selectedClusters, scheduleErr := r.selectClusters(placementNew)
if scheduleErr != nil {
klog.ErrorS(scheduleErr, "Failed to select the clusters", "placement", placeRef)
updatePlacementScheduledCondition(placementOld, scheduleErr)
r.updatePlacementScheduledCondition(placementOld, scheduleErr)
_ = r.Client.Status().Update(ctx, placementOld, client.FieldOwner(utils.PlacementFieldManagerName))
return ctrl.Result{}, scheduleErr
}
Expand All @@ -103,7 +105,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ct
manifests, scheduleErr := r.selectResources(ctx, placementNew)
if scheduleErr != nil {
klog.ErrorS(scheduleErr, "failed to generate the work resource for this placementOld", "placement", placeRef)
updatePlacementScheduledCondition(placementOld, scheduleErr)
r.updatePlacementScheduledCondition(placementOld, scheduleErr)
_ = r.Client.Status().Update(ctx, placementOld, client.FieldOwner(utils.PlacementFieldManagerName))
return ctrl.Result{}, scheduleErr
}
Expand All @@ -119,7 +121,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ct
totalCluster, totalResources, scheduleErr := r.persistSelectedResourceUnion(ctx, placementOld, placementNew)
if scheduleErr != nil {
klog.ErrorS(scheduleErr, "failed to record the work resources ", "placement", placeRef)
updatePlacementScheduledCondition(placementOld, scheduleErr)
r.updatePlacementScheduledCondition(placementOld, scheduleErr)
_ = r.Client.Status().Update(ctx, placementOld, client.FieldOwner(utils.PlacementFieldManagerName))
return ctrl.Result{}, scheduleErr
}
Expand All @@ -132,7 +134,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ct
scheduleErr = r.scheduleWork(ctx, placementNew, manifests)
if scheduleErr != nil {
klog.ErrorS(scheduleErr, "failed to apply work resources ", "placement", placeRef)
updatePlacementScheduledCondition(placementOld, scheduleErr)
r.updatePlacementScheduledCondition(placementOld, scheduleErr)
_ = r.Client.Status().Update(ctx, placementOld, client.FieldOwner(utils.PlacementFieldManagerName))
return ctrl.Result{}, scheduleErr
}
Expand All @@ -145,31 +147,33 @@ func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ct
// as they are not recorded in the old placement status.
// TODO: add them to the old placement selected clusters since the work has been created although the update can still fail
klog.ErrorS(scheduleErr, "failed to remove work resources from previously selected clusters", "placement", placeRef)
updatePlacementScheduledCondition(placementOld, scheduleErr)
r.updatePlacementScheduledCondition(placementOld, scheduleErr)
_ = r.Client.Status().Update(ctx, placementOld, client.FieldOwner(utils.PlacementFieldManagerName))
return ctrl.Result{}, scheduleErr
}
klog.V(3).InfoS("Successfully removed work resources from previously selected clusters", "placement", placementOld.Name, "removed clusters", removed)

// the schedule has succeeded, so we now can use the placementNew status that contains all the newly selected cluster and resources
updatePlacementScheduledCondition(placementNew, nil)
r.Recorder.Event(placementNew, corev1.EventTypeNormal, eventReasonResourceScheduled, "successfully scheduled all selected resources to their clusters")
r.updatePlacementScheduledCondition(placementNew, nil)

// go through all the valid works, get the failed and pending manifests
hasPending, applyErr := r.collectAllManifestsStatus(placementNew)
if applyErr != nil {
klog.ErrorS(applyErr, "failed to collect work resources status from all selected clusters", "placement", placeRef)
updatePlacementAppliedCondition(placementNew, applyErr)
r.updatePlacementAppliedCondition(placementNew, applyErr)
_ = r.Client.Status().Update(ctx, placementNew, client.FieldOwner(utils.PlacementFieldManagerName))
return ctrl.Result{}, applyErr
}
klog.V(3).InfoS("Successfully collected work resources status from all selected clusters", "placement", placementOld.Name, "number of clusters", len(selectedClusters))
klog.V(3).InfoS("Successfully collected work resources status from all selected clusters",
"placement", placementOld.Name, "number of clusters", len(selectedClusters), "hasPending", hasPending,
"numberFailedPlacement", len(placementNew.Status.FailedResourcePlacements))

if !hasPending && len(placementNew.Status.FailedResourcePlacements) == 0 {
updatePlacementAppliedCondition(placementNew, nil)
r.Recorder.Event(placementNew, corev1.EventTypeNormal, eventReasonResourceApplied, "successfully applied all selected resources")
r.updatePlacementAppliedCondition(placementNew, nil)
} else if len(placementNew.Status.FailedResourcePlacements) == 0 {
r.updatePlacementAppliedCondition(placementNew, ErrStillPendingManifest)
} else {
updatePlacementAppliedCondition(placementNew, ErrFailedManifest)
r.updatePlacementAppliedCondition(placementNew, ErrFailedManifest)
}

// we keep a slow reconcile loop here as a backup.
Expand All @@ -190,7 +194,7 @@ func (r *Reconciler) removeAllWorks(ctx context.Context, placement *fleetv1alpha
placement.Status.TargetClusters = nil
placement.Status.SelectedResources = nil
placement.Status.FailedResourcePlacements = nil
updatePlacementScheduledCondition(placement, fmt.Errorf("the placement didn't select any resource or cluster"))
r.updatePlacementScheduledCondition(placement, fmt.Errorf("the placement didn't select any resource or cluster"))
return ctrl.Result{}, r.Client.Status().Update(ctx, placement, client.FieldOwner(utils.PlacementFieldManagerName))
}

Expand Down Expand Up @@ -252,7 +256,10 @@ func (r *Reconciler) getPlacement(name string) (*fleetv1alpha1.ClusterResourcePl
return &placement, nil
}

func updatePlacementScheduledCondition(placement *fleetv1alpha1.ClusterResourcePlacement, scheduleErr error) {
// updatePlacementScheduledCondition updates the placement's schedule condition according to the schedule error
func (r *Reconciler) updatePlacementScheduledCondition(placement *fleetv1alpha1.ClusterResourcePlacement, scheduleErr error) {
placementRef := klog.KObj(placement)
schedCond := placement.GetCondition(string(fleetv1alpha1.ResourcePlacementConditionTypeScheduled))
if scheduleErr == nil {
placement.SetConditions(metav1.Condition{
Status: metav1.ConditionTrue,
Expand All @@ -261,6 +268,10 @@ func updatePlacementScheduledCondition(placement *fleetv1alpha1.ClusterResourceP
Message: "Successfully scheduled resources for placement",
ObservedGeneration: placement.Generation,
})
if schedCond == nil || schedCond.Status != metav1.ConditionTrue {
klog.V(3).InfoS("successfully scheduled all selected resources to their clusters", "placement", placementRef)
r.Recorder.Event(placement, corev1.EventTypeNormal, "ResourceScheduled", "successfully scheduled all selected resources to their clusters")
}
} else {
placement.SetConditions(metav1.Condition{
Status: metav1.ConditionFalse,
Expand All @@ -272,22 +283,51 @@ func updatePlacementScheduledCondition(placement *fleetv1alpha1.ClusterResourceP
}
}

func updatePlacementAppliedCondition(placement *fleetv1alpha1.ClusterResourcePlacement, applyErr error) {
if applyErr == nil {
// updatePlacementAppliedCondition updates the placement's applied condition according to the apply error
func (r *Reconciler) updatePlacementAppliedCondition(placement *fleetv1alpha1.ClusterResourcePlacement, applyErr error) {
placementRef := klog.KObj(placement)
preAppliedCond := placement.GetCondition(string(fleetv1alpha1.ResourcePlacementStatusConditionTypeApplied))
if preAppliedCond != nil {
// this pointer value will be modified by the setCondition, so we need to take a deep copy.
preAppliedCond = preAppliedCond.DeepCopy()
}
switch {
case applyErr == nil:
placement.SetConditions(metav1.Condition{
Status: metav1.ConditionTrue,
Type: string(fleetv1alpha1.ResourcePlacementStatusConditionTypeApplied),
Reason: "applySucceeded",
Reason: ApplySucceededReason,
Message: "Successfully applied resources to member clusters",
ObservedGeneration: placement.Generation,
})
} else {
klog.V(3).InfoS("successfully applied all selected resources", "placement", placementRef)
if preAppliedCond == nil || preAppliedCond.Status != metav1.ConditionTrue {
r.Recorder.Event(placement, corev1.EventTypeNormal, "ResourceApplied", "successfully applied all selected resources")
}
case errors.Is(applyErr, ErrStillPendingManifest):
placement.SetConditions(metav1.Condition{
Status: metav1.ConditionUnknown,
Type: string(fleetv1alpha1.ResourcePlacementStatusConditionTypeApplied),
Reason: ApplyPendingReason,
Message: applyErr.Error(),
ObservedGeneration: placement.Generation,
})
klog.V(3).InfoS("Some selected resources are still waiting to be applied", "placement", placementRef)
if preAppliedCond == nil || preAppliedCond.Status == metav1.ConditionTrue {
r.Recorder.Event(placement, corev1.EventTypeWarning, "ResourceApplyPending", "Some applied resources are now waiting to be applied to the member cluster")
}
default:
// this includes ErrFailedManifest and any other applyError
placement.SetConditions(metav1.Condition{
Status: metav1.ConditionFalse,
Type: string(fleetv1alpha1.ResourcePlacementStatusConditionTypeApplied),
Reason: "applyFailed",
Reason: ApplyFailedReason,
Message: applyErr.Error(),
ObservedGeneration: placement.Generation,
})
klog.V(3).InfoS("failed to apply some selected resources", "placement", placementRef)
if preAppliedCond == nil || preAppliedCond.Status != metav1.ConditionFalse {
r.Recorder.Event(placement, corev1.EventTypeWarning, "ResourceApplyFailed", "failed to apply some selected resources")
}
}
}
6 changes: 4 additions & 2 deletions pkg/controllers/clusterresourceplacement/work_propagation.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ func (r *Reconciler) collectAllManifestsStatus(placement *fleetv1alpha1.ClusterR
work, err := r.getResourceBinding(memberClusterNsName, workName)
if err != nil {
if apierrors.IsNotFound(err) {
klog.Error(err, "the work does not exist", "work", klog.KRef(memberClusterNsName, workName))
klog.V(3).InfoS("the work change has not shown up in the cache yet",
"work", klog.KRef(memberClusterNsName, workName), "cluster", cluster)
hasPending = true
continue
}
return false, errors.Wrap(err, fmt.Sprintf("failed to get the work obj %s from namespace %s", workName, memberClusterNsName))
Expand Down Expand Up @@ -202,7 +204,7 @@ func (r *Reconciler) collectAllManifestsStatus(placement *fleetv1alpha1.ClusterR
appliedCond = meta.FindStatusCondition(manifestCondition.Conditions, workController.ConditionTypeApplied)
// collect if there is an explicit fail
if appliedCond != nil && appliedCond.Status != metav1.ConditionTrue {
klog.V(4).InfoS("find a failed to apply manifest", "member cluster namespace", memberClusterNsName,
klog.V(3).InfoS("find a failed to apply manifest", "member cluster namespace", memberClusterNsName,
"manifest name", manifestCondition.Identifier.Name, "group", manifestCondition.Identifier.Group,
"version", manifestCondition.Identifier.Version, "kind", manifestCondition.Identifier.Kind)
placement.Status.FailedResourcePlacements = append(placement.Status.FailedResourcePlacements, fleetv1alpha1.FailedResourcePlacement{
Expand Down
6 changes: 5 additions & 1 deletion test/e2e/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ Here is how to run e2e locally. Make sure that you have installed Docker and Kin

1. Build the docker images
```shell
export KUBECONFIG=~/.kube/config
OUTPUT_TYPE=type=docker make docker-build-member-agent docker-build-hub-agent docker-build-refresh-token
```

2. Create the kind clusters and install the helm.
```shell
export KUBECONFIG=~/.kube/config
make creat-kind-cluster
```
or
```shell
make create-hub-kind-cluster
make create-member-kind-cluster
make install-helm
Expand Down
25 changes: 14 additions & 11 deletions test/e2e/work_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
workapi "sigs.k8s.io/work-api/pkg/apis/v1alpha1"
"sigs.k8s.io/work-api/pkg/utils"

fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
fleetutil "go.goms.io/fleet/pkg/utils"
)

Expand Down Expand Up @@ -139,15 +140,15 @@ var _ = Describe("work-api testing", Ordered, func() {

})

It("should apply both the duplicate manifest", func() {
It("should apply both the works with duplicated manifest", func() {
By("creating the work resources")
err = createWork(workOne, HubCluster)
Expect(err).ToNot(HaveOccurred())

err = createWork(workTwo, HubCluster)
Expect(err).ToNot(HaveOccurred())

By("Checking the Applied Work status of each to see if one of the manifest is abandoned.")
By("Checking the Applied Work status of each to see both are applied.")
Eventually(func() bool {
appliedWorkOne, err := retrieveAppliedWork(workOne.Name, MemberCluster)
if err != nil {
Expand All @@ -159,7 +160,7 @@ var _ = Describe("work-api testing", Ordered, func() {
return false
}

return len(appliedWorkOne.Status.AppliedResources)+len(appliedWorkTwo.Status.AppliedResources) == 2
return len(appliedWorkOne.Status.AppliedResources) == 1 && len(appliedWorkTwo.Status.AppliedResources) == 1
}, eventuallyTimeout, eventuallyInterval).Should(BeTrue())

By("Checking the work status of each works for verification")
Expand All @@ -172,18 +173,20 @@ var _ = Describe("work-api testing", Ordered, func() {
if err != nil {
return false
}
workOneCondition := meta.IsStatusConditionTrue(workOne.Status.ManifestConditions[0].Conditions, "Applied")
workTwoCondition := meta.IsStatusConditionTrue(workTwo.Status.ManifestConditions[0].Conditions, "Applied")
workOneCondition := meta.IsStatusConditionTrue(workOne.Status.ManifestConditions[0].Conditions, string(fleetv1alpha1.ResourcePlacementStatusConditionTypeApplied))
workTwoCondition := meta.IsStatusConditionTrue(workTwo.Status.ManifestConditions[0].Conditions, string(fleetv1alpha1.ResourcePlacementStatusConditionTypeApplied))
return workOneCondition && workTwoCondition
}, eventuallyTimeout, eventuallyInterval).Should(BeTrue())

By("verifying there is only one real resource on the spoke")
By("verifying the one resource on the spoke are owned by both appliedWork")
var deploy appsv1.Deployment
err := MemberCluster.KubeClient.Get(context.Background(), types.NamespacedName{
Name: manifestDetailsOne[0].ObjMeta.Name,
Namespace: manifestDetailsOne[0].ObjMeta.Namespace}, &deploy)
Expect(err).Should(Succeed())
Expect(len(deploy.OwnerReferences)).Should(Equal(2))
Eventually(func() int {
err := MemberCluster.KubeClient.Get(context.Background(), types.NamespacedName{
Name: manifestDetailsOne[0].ObjMeta.Name,
Namespace: manifestDetailsOne[0].ObjMeta.Namespace}, &deploy)
Expect(err).Should(Succeed())
return len(deploy.OwnerReferences)
}, eventuallyTimeout, eventuallyInterval).Should(Equal(2))

By("delete the work two resources")
Expect(deleteWorkResource(workTwo, HubCluster)).To(Succeed())
Expand Down
32 changes: 31 additions & 1 deletion test/integration/cluster_placement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1"

fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
"go.goms.io/fleet/pkg/controllers/clusterresourceplacement"
"go.goms.io/fleet/pkg/utils"
)

Expand Down Expand Up @@ -119,7 +120,7 @@ var _ = Describe("Test Cluster Resource Placement Controller", func() {
markInternalMCJoined(clusterB)
})

It("Test select the resources by name", func() {
It("Test select the resources by name happy path", func() {
crp = &fleetv1alpha1.ClusterResourcePlacement{
ObjectMeta: metav1.ObjectMeta{
Name: "test-list-resource",
Expand Down Expand Up @@ -149,6 +150,15 @@ var _ = Describe("Test Cluster Resource Placement Controller", func() {

Expect(k8sClient.Get(ctx, types.NamespacedName{Name: crp.Name}, crp)).Should(Succeed())
verifyPlacementScheduleStatus(crp, 2, 2, metav1.ConditionTrue)
verifyPlacementApplyStatus(crp, metav1.ConditionUnknown, clusterresourceplacement.ApplyPendingReason)

By("Mimic work apply succeeded")
markWorkAppliedStatusSuccess(crp, &clusterA)
markWorkAppliedStatusSuccess(crp, &clusterB)

waitForPlacementScheduleStopped(crp.Name)
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: crp.Name}, crp)).Should(Succeed())
verifyPlacementApplyStatus(crp, metav1.ConditionTrue, clusterresourceplacement.ApplySucceededReason)
})

It("Test select the resources by label", func() {
Expand Down Expand Up @@ -192,6 +202,7 @@ var _ = Describe("Test Cluster Resource Placement Controller", func() {

Expect(k8sClient.Get(ctx, types.NamespacedName{Name: crp.Name}, crp)).Should(Succeed())
verifyPlacementScheduleStatus(crp, 2, 2, metav1.ConditionTrue)
verifyPlacementApplyStatus(crp, metav1.ConditionUnknown, clusterresourceplacement.ApplyPendingReason)
})

It("Test select all the resources in a namespace", func() {
Expand Down Expand Up @@ -244,6 +255,10 @@ var _ = Describe("Test Cluster Resource Placement Controller", func() {
Expect(len(clusterWork.Spec.Workload.Manifests)).Should(BeIdenticalTo(len(namespacedResource) + 1))
})

XIt("Test some of the resources selectors does not match any resource", func() {

})

It("Test select only the propagated resources in a namespace", func() {
By("Create a lease resource in the namespace")
lease := coordv1.Lease{
Expand Down Expand Up @@ -750,6 +765,7 @@ var _ = Describe("Test Cluster Resource Placement Controller", func() {
verifyWorkObjects(crp, namespacedResource, []*fleetv1alpha1.MemberCluster{&clusterB})
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: crp.Name}, crp)).Should(Succeed())
verifyPlacementScheduleStatus(crp, len(namespacedResource), 1, metav1.ConditionTrue)
verifyPlacementApplyStatus(crp, metav1.ConditionUnknown, clusterresourceplacement.ApplyPendingReason)

By("Verify that work is not created in cluster A")
var clusterWork workv1alpha1.Work
Expand Down Expand Up @@ -1055,4 +1071,18 @@ var _ = Describe("Test Cluster Resource Placement Controller", func() {
By("Verified that the deleted clusterRole is removed from the work")
})
})

Context("Test with simulated work api functionality", func() {
BeforeEach(func() {
By("Mark member cluster A as joined")
markInternalMCJoined(clusterA)

By("Mark member cluster B as joined")
markInternalMCJoined(clusterB)
})

XIt("Test partial failed apply", func() {

})
})
})
Loading

0 comments on commit cd3bccb

Please sign in to comment.