Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
example_lamp_endpoint.py 1.46 KiB
#
# SPDX-License-Identifier: Apache-2.0
#
# SPDX-FileCopyrightText: Huawei Inc.
#
import asyncio
from eddie_endpoint import EddieEndpoint
import aiocoap
import logging
from pprint import pprint

logging.getLogger().setLevel(logging.DEBUG)

ENDPOINT_NAME = "eddie-lamp-node"

# This is a simple example of a Eddie resource that mocks a lamp
# The code to retrieve lamp status and actually turn on and off 
# the lamp should be in render_{get/put}
class EddieLamp(aiocoap.resource.Resource):
    def __init__(self):
        super().__init__()
        self.lamp_status = False
        self.rt = "eddie.lamp"

    async def render_get(self, request):
        lamp_status_map = { False: b"off", True: b"on"}
        payload = lamp_status_map[self.lamp_status]
        return aiocoap.Message(payload=payload)

    async def render_put(self, request):
        self.lamp_status = True if request.payload == b"on" else False
        return aiocoap.Message(code=aiocoap.CHANGED)


async def main():

    my_endpoint = EddieEndpoint(ENDPOINT_NAME)
    my_endpoint.add_resource(['lamp1'], EddieLamp())
    await my_endpoint.start_server()
    await my_endpoint.publish_resources_to_rd()
    
    discovered_resources = await my_endpoint.get_resources_from_rd(resource_type="eddie.lamp")
    print("lamps status: ")
    for resource in discovered_resources:
        print(resource)

    # Run forever
    await asyncio.get_running_loop().create_future()

if __name__ == "__main__":
    asyncio.run(main())