【CSEN037】CS Simulator with OPC UA Client

Background - What is OPC UA

OPC Unified Architecture (OPC UA) is a cross-platform, open-source, IEC62541 standard for data exchange from sensors to cloud applications developed by the OPC Foundation. Distinguishing characteristics are:

  • Focus on data collection and control communication, with widespread use in industrial equipment and systems
  • Open - open-source reference implementations freely available to OPC Foundation members, non members under GPL 2.0 license
  • Cross-platform - not tied to one operating system or programming language
  • Service-oriented architecture (SOA)
  • Extensible security profiles, including authentication, authorization, encryption and checksums
  • The future standard for communication and information modeling in automation

Preparation - How to Test on Your Computer

We can use VMware Workstation to run the CS Simulator and test it. Refer to the CS Simulator Installation Guide for further details.

After installing VMware Workstation, we must install the OPC UA Server plugin in simulator to test the connection. Follow these steps to install the plugin:

Step1

Open the simulator for the model you wish to test. Once program loading is finished, click on the logo located in the upper right corner. Select "Settings" from the menu.



Step2

Click ELITECOs on the left, then click the + sign to add the plugin.



Step3

Double-click on the ELITECOs folder to enter. Then, double-click on the fieldbus folder to enter. From there, double-click on the OpcuaServer folder to select the EliOPCUAServer-1.2.6.0.elico file and open it.




Step4

Click the Inactivation tab, then select the restart button and confirm the activation of the plugin.



Step5

Open the simulator on your desktop, click on the "Config" button, select the OPC UA Server from the Plugin menu located at the bottom, and then click on the "Start" button to start the OPC UA Server.



Step6

Wait for the state to change to Running, indicating that the OPC UA Server has been launched successfully.



Getting Started - Learn how to remotely connect to and control the simulator

First, connect to the OPC UA Server using UaExpert to test the connection and view relevant node information. Detailed instructions for installation and usage can be found in the OPCUA Manual.

To obtain the IP address of the OPC UA Server, simply type "ifconfig" in the terminal.


After a successful connection, we can drag the node to the Data Access View for information viewing. The NodeId can be found in the Attributes window.


So how can we read node information and call methods in Python?

FreeOpcUa (official website: http://freeopcua.github.io/) provides some open source tools to help us make OPC UA connections via Python scripts. In the following we will talk about using asyncua (https://github.com/FreeOpcUa/opcua-asyncio):

Python environment setup steps will not be reiterated, assuming the system has Python 3.8 or higher installed (note that the script may encounter issues when run on Python 3.7 or lower). If the environment does not meet the requirements, we suggest downloading and installing the appropriate version of Python.

Step1

The opcua-asyncio package is available PyPi as asyncua package. To install the package execute:

pip install asyncua

Once the installation is completed, the package is ready to be used. To verify the installation, or the report the version if you create a bugreport, the following code can be run in a python interpreter:

import asyncua
print(asyncua.__version__)

If the version number is returned correctly, asyncua is successfully installed, and we can learn how to utilize it from the official documentation (https://opcua-asyncio.readthedocs.io/).

Step2

Now we can create scripts for communicating with OPC UA. Below is a basic template that we can use to retrieve information about nodes and execute method calls.

import asyncio
import logging
from asyncua import Client

logging.basicConfig(level=logging.ERROR)
_logger = logging.getLogger('asyncua')

async def main():
    #create connection, please modify the ip of the url before connection
    client = Client(url="opc.tcp://192.168.142.128:4840")
    client.set_user('elite')
    client.set_password('elibot')
    await client.connect()

    print("Client connected")

    try:
        #IsPowerON
        is_power_on = client.get_node("ns=1;i=54275")
        name = (await is_power_on.read_browse_name()).Name
        value = (await is_power_on.read_value())
        print(f"Before:{name} = {value}")

        #PowerON
        object_node = client.get_node("ns=1;i=54267")
        method_node = client.get_node("ns=6;i=7001")
        input_arguments = []
        output_arguments = await object_node.call_method(method_node, *input_arguments)

        #Wait 3 seconds
        await asyncio.sleep(3)

        #IsPowerON
        is_power_on = client.get_node("ns=1;i=54275")
        name = (await is_power_on.read_browse_name()).Name
        value = (await is_power_on.read_value())
        print(f"After:{name} = {value}")

    except Exception:
        _logger.exception('error')
    finally:
        await client.disconnect()

asyncio.run(main())

You only need to change the IP address of the url in client = Client(url="opc.tcp://192.168.142.128:4840") to the IP address of the virtual machine, and then run the script to execute the following commands to query the value of the IsPowerON node, call the PowerON method to turn on the power of the robot in the simulator, wait for 3 seconds, query the value of the IsPowerON node again, and observe the power status of the simulator in the virtual machine, and you will find that the original "Power off" has changed to "Idle". At the same time, the Python terminal will output the following content:

Client connected

Before:IsPowerOn = False

After:IsPowerOn = True



Below we will describe how to write node queries and method calls:

First, observe the following statements

#IsPowerON
is_power_on = client.get_node("ns=1;i=54275")
name = (await is_power_on.read_browse_name()).Name
value = (await is_power_on.read_value())
print(f"Before:{name} = {value}")

To find the desired node, we simply replace the NodeId in client.get_node() and retrieve its name and value. The NodeId can be searched for in the aforementioned UaExpert.

Exercise: Query the NodeId for IsProgramRunning and write a Python program to determine if the robot in the simulator is currently running a program.


We then observe the following statements

#PowerON
object_node = client.get_node("ns=1;i=54267")
method_node = client.get_node("ns=6;i=7001")
input_arguments = []
output_arguments = await object_node.call_method(method_node, *input_arguments)

First, we need to get the object by the NodeId of the object to which the method belongs, then get the method by the NodeId of the method, change the input parameters (or null if the method doesn't need them) and call the method.

Exercise: Query the NodeId of the PowerOFF method and write a Python program to turn off the power of the robot in the simulator.


Let's move on to this Python program.

import asyncio
import logging
from asyncua import Client

logging.basicConfig(level=logging.ERROR)
_logger = logging.getLogger('asyncua')

async def main():
    #create connection, please modify the ip of the url before connection
    client = Client(url="opc.tcp://192.168.142.128:4840")
    client.set_user('elite')
    client.set_password('elibot')
    await client.connect()

    print("Client connected")

    try:
        #get NodeId
        nodeid = await client.nodes.root.get_child(["0:Objects", "1:EliteROBOTS","6:Controller","6:ControllerOperatingInfo","6:Status","6:IsPowerOn"])
        print(f"IsPowerON:{nodeid}")

    except Exception:
        _logger.exception('error')
    finally:
        await client.disconnect()

asyncio.run(main())

Observe this part of the program.

#get NodeId
nodeid = await client.nodes.root.get_child(["0:Objects","1:EliteROBOTS","6:Controller","6:ControllerOperatingInfo","6:Status","6:IsPowerOn"])
print(f"IsPowerON:{nodeid}")

By running the program, this result can be obtained:

Client connected

IsPowerON:ns=1;i=54275

The obtained result indicates that the NodeId of a node can be acquired through client.nodes.root.get_child(), and the NodeId of other nodes can be obtained by modifying only the BrowsePath parameter. The BrowsePath can be viewed in UaExpert. From the Object layer, one needs to splice the numbers and names in the BrowseName of the node's attributes with ":" and list them in order. Through this method, the NodeId of the node can be queried.

Exercise: Write a Python program to retrieve the NodeId for IsProgramRunning via get_child().


Once we have become proficient in obtaining the NodeId through this method, we can proceed to retrieve the node tree.

import asyncio
import logging
from asyncua import Client, Node, ua

logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger('asyncua')

async def browse_nodes(node: Node):
    """
    Build a nested node tree dict by recursion (filtered by OPC UA objects and variables).
    """
    node_class = await node.read_node_class()
    children = []
    for child in await node.get_children():
        if await child.read_node_class() in [ua.NodeClass.Object, ua.NodeClass.Variable]:
            children.append(
                await browse_nodes(child)
            )
    if node_class != ua.NodeClass.Variable:
        var_type = None
    else:
        try:
            var_type = (await node.read_data_type_as_variant_type()).value
        except ua.UaError:
            _logger.warning('Node Variable Type could not be determined for %r', node)
            var_type = None
    return {
        'id': node.nodeid.to_string(),
        'name': (await node.read_display_name()).Text,
        'cls': node_class.value,
        'children': children,
        'type': var_type,
    }

async def task(loop):
    url = "opc.tcp://192.168.142.128:4840"
    # url = "opc.tcp://localhost:4840/freeopcua/server/"
    try:
        client = Client(url=url)
        client.set_user('elite')
        client.set_password('elibot')
        # client.set_security_string()
        await client.connect()
        # Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
        root = client.nodes.root
        _logger.info("Objects node is: %r", root)

        # Node objects have methods to read and write node attributes as well as browse or populate address space
        _logger.info("Children of root are: %r", await root.get_children())

        tree = await browse_nodes(client.nodes.objects)
        _logger.info('Node tree: %r', tree)
    except Exception:
        _logger.exception('error')
    finally:
        await client.disconnect()

def main():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.set_debug(True)
    loop.run_until_complete(task(loop))
    loop.close()

if __name__ == "__main__":
    main()

Exercise: Modify and run the program, then view the results. Consider how the output is generated.

点击显示全文
赞同0
发表评论
分享

手机扫码分享
0
502
收藏
举报
收起
登录
  • 密码登录
  • 验证码登录
还没有账号,立即注册
还没有账号,立即注册
注册
已有账号,立即登录
选择发帖板块
上传文件
举报
请选择举报理由
举报
举报说明